chore: Make use of japi.function in stream api. (#2143)

This commit is contained in:
He-Pin(kerr) 2025-09-05 03:24:26 +08:00 committed by GitHub
parent 70a9f092dd
commit 6f2a65da4c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 182 additions and 133 deletions

View file

@ -105,7 +105,7 @@ trait Predicate2[-T1, -T2] extends java.io.Serializable {
/**
* A constructor/factory, takes no parameters but creates a new value of type T every call.
* Supports throwing `Exception` in the apply, which the `java.util.function.Supplier` counterpart does not.
* Supports throwing `Exception` in the create method, which the `java.util.function.Supplier` counterpart does not.
*/
@nowarn("msg=@SerialVersionUID has no effect")
@SerialVersionUID(1L)

View file

@ -19,7 +19,7 @@ This operator is included in:
## Signature
@apidoc[ActorFlow.ask](ActorFlow$) { scala="#ask%5BI,Q,A](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5BI,A,org.apache.pekko.NotUsed]" java="#ask(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)" }
@apidoc[ActorFlow.ask](ActorFlow$) { scala="#ask%5BI,Q,A](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5BI,A,org.apache.pekko.NotUsed]" java="#ask(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,org.apache.pekko.japi.function.Function2)" }
## Description

View file

@ -19,7 +19,7 @@ This operator is included in:
## Signature
@apidoc[ActorFlow.askWithContext](ActorFlow$) { scala="#askWithContext%5BI,Q,A,Ctx](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]" java="#askWithContext(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)" }
@apidoc[ActorFlow.askWithContext](ActorFlow$) { scala="#askWithContext%5BI,Q,A,Ctx](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]" java="#askWithContext(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,org.apache.pekko.japi.function.Function2)" }
## Description

View file

@ -18,8 +18,8 @@ This operator is included in:
## Signature
@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]" java ="#askWithStatus[I,Q,A](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q]):org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]" }
@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]" java ="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q]):org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]" }
@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]" java ="#askWithStatus[I,Q,A](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.apache.pekko.japi.function.Function2[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q]):org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]" }
@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]" java ="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.apache.pekko.japi.function.Function2[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q]):org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]" }
## Description

View file

@ -19,7 +19,7 @@ This operator is included in:
## Signature
@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) { scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]" java ="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q])" }
@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) { scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]" java ="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.apache.pekko.japi.function.Function2[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q])" }
## Description

View file

@ -19,7 +19,7 @@ This operator is included in:
## Signature
@apidoc[ActorSource.actorRef](ActorSource$) { scala="#actorRef[T](completionMatcher:PartialFunction[T,Unit],failureMatcher:PartialFunction[T,Throwable],bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.actor.typed.ActorRef[T]]" java="#actorRef(java.util.function.Predicate,org.apache.pekko.japi.function.Function,int,org.apache.pekko.stream.OverflowStrategy)" }
@apidoc[ActorSource.actorRef](ActorSource$) { scala="#actorRef[T](completionMatcher:PartialFunction[T,Unit],failureMatcher:PartialFunction[T,Throwable],bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.actor.typed.ActorRef[T]]" java="#actorRef(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function,int,org.apache.pekko.stream.OverflowStrategy)" }
## Description

View file

@ -6,7 +6,7 @@ Defer the creation of a `Sink` until materialization and access `Materializer` a
## Signature
@apidoc[Sink.fromMaterializer](Sink$) { scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Sink[T,M]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]" java="#fromMaterializer(java.util.function.BiFunction)" }
@apidoc[Sink.fromMaterializer](Sink$) { scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Sink[T,M]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]" java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
## Description

View file

@ -8,8 +8,8 @@ Aggregate and emit until custom boundary condition met.
## Signature
@apidoc[Source.aggregateWithBoundary](Source) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"}
@apidoc[Flow.aggregateWithBoundary](Flow) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)" }
@apidoc[Source.aggregateWithBoundary](Source) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"}
@apidoc[Flow.aggregateWithBoundary](Flow) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)" }
## Description

View file

@ -6,8 +6,8 @@ Delay every element passed through with a duration that can be controlled dynami
## Signature
@apidoc[Source.delayWith](Source) { scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]" java="#delayWith(java.util.function.Supplier,org.apache.pekko.stream.DelayOverflowStrategy)" }
@apidoc[Flow.delayWith](Flow) { scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]" java="#delayWith(java.util.function.Supplier,org.apache.pekko.stream.DelayOverflowStrategy)" }
@apidoc[Source.delayWith](Source) { scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]" java="#delayWith(org.apache.pekko.japi.function.Creator,org.apache.pekko.stream.DelayOverflowStrategy)" }
@apidoc[Flow.delayWith](Flow) { scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]" java="#delayWith(org.apache.pekko.japi.function.Creator,org.apache.pekko.stream.DelayOverflowStrategy)" }
## Description

View file

@ -6,8 +6,8 @@ Defer the creation of a `Source/Flow` until materialization and access `Material
## Signature
@apidoc[Source.fromMaterializer](Source$) { scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Source[T,M]):org.apache.pekko.stream.scaladsl.Source[T,scala.concurrent.Future[M]]" java="#fromMaterializer(java.util.function.BiFunction)" }
@apidoc[Flow.fromMaterializer](Flow$) { scala="#fromMaterializer[T,U,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Flow[T,U,M]):org.apache.pekko.stream.scaladsl.Flow[T,U,scala.concurrent.Future[M]]" java="#fromMaterializer(java.util.function.BiFunction)" }
@apidoc[Source.fromMaterializer](Source$) { scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Source[T,M]):org.apache.pekko.stream.scaladsl.Source[T,scala.concurrent.Future[M]]" java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
@apidoc[Flow.fromMaterializer](Flow$) { scala="#fromMaterializer[T,U,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Flow[T,U,M]):org.apache.pekko.stream.scaladsl.Flow[T,U,scala.concurrent.Future[M]]" java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" }
## Description

View file

@ -6,9 +6,9 @@ Allows completing the stream when an upstream error occurs.
## Signature
@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.util.function.Predicate)" }
@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(org.apache.pekko.japi.function.Predicate)" }
@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.lang.Class)" }
@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.util.function.Predicate)" }
@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(org.apache.pekko.japi.function.Predicate)" }
@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.lang.Class)" }
## Description

View file

@ -6,8 +6,8 @@ Allow sending of one last element downstream when a failure has happened upstrea
## Signature
@apidoc[Source.recover](Source) { scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(scala.PartialFunction)" java="#recover(java.lang.Class,java.util.function.Supplier)" }
@apidoc[Flow.recover](Flow) { scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(scala.PartialFunction)" java="#recover(java.lang.Class,java.util.function.Supplier)" }
@apidoc[Source.recover](Source) { scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(scala.PartialFunction)" java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
@apidoc[Flow.recover](Flow) { scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(scala.PartialFunction)" java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
## Description

View file

@ -6,8 +6,8 @@ Allow switching to alternative Source when a failure has happened upstream.
## Signature
@apidoc[Source.recoverWith](Source) { scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWith(java.lang.Class,java.util.function.Supplier)" }
@apidoc[Flow.recoverWith](Flow) { scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWith(java.lang.Class,java.util.function.Supplier)" }
@apidoc[Source.recoverWith](Source) { scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWith(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
@apidoc[Flow.recoverWith](Flow) { scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWith(java.lang.Class,org.apache.pekko.japi.function.Creator)" }
## Description

View file

@ -6,8 +6,8 @@ RecoverWithRetries allows to switch to alternative Source on flow failure.
## Signature
@apidoc[Source.recoverWithRetries](Source) { scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)" }
@apidoc[Flow.recoverWithRetries](Flow) { scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)" }
@apidoc[Source.recoverWithRetries](Source) { scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)" }
@apidoc[Flow.recoverWithRetries](Flow) { scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)" }
## Description

View file

@ -219,7 +219,7 @@ It is possible to define how many initial consumers that are required before it
to the attached consumers. While not enough consumers have been attached messages are buffered and when the
buffer is full the upstream producer is backpressured. No messages are dropped.
The above example illustrate a stateless partition function. For more advanced stateful routing the @java[@javadoc[ofStateful](pekko.stream.javadsl.PartitionHub$#ofStateful(java.lang.Class,java.util.function.Supplier,int))]
The above example illustrate a stateless partition function. For more advanced stateful routing the @java[@javadoc[ofStateful](pekko.stream.javadsl.PartitionHub$#ofStateful(java.lang.Class,org.apache.pekko.japi.function.Creator,int))]
@scala[@scaladoc[statefulSink](pekko.stream.scaladsl.PartitionHub$#statefulSink[T](partitioner:()=%3E(org.apache.pekko.stream.scaladsl.PartitionHub.ConsumerInfo,T)=%3ELong,startAfterNrOfConsumers:Int,bufferSize:Int):org.apache.pekko.stream.scaladsl.Sink[T,org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]])] can be used. Here is an example of a stateful round-robin function:
Scala

View file

@ -20,8 +20,8 @@ Each of the operators downstream gets informed about the failure and each upstre
In many cases you may want to avoid complete stream failure, this can be done in a few different ways:
* @apidoc[recover](stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,java.util.function.Supplier)"} to emit a final element then complete the stream normally on upstream failure
* @apidoc[recoverWithRetries](stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"} to create a new upstream and start consuming from that on failure
* @apidoc[recover](stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)"} to emit a final element then complete the stream normally on upstream failure
* @apidoc[recoverWithRetries](stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"} to create a new upstream and start consuming from that on failure
* Restarting sections of the stream after a backoff
* Using a supervision strategy for operators that support it
@ -52,7 +52,7 @@ in @ref:[Logging in streams](stream-cookbook.md#logging-in-streams).
## Recover
@apidoc[recover](stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,java.util.function.Supplier)"} allows you to emit a final element and then complete the stream on an upstream failure.
@apidoc[recover](stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)"} allows you to emit a final element and then complete the stream on an upstream failure.
Deciding which exceptions should be recovered is done through a @scaladoc[PartialFunction](scala.PartialFunction). If an exception
does not have a @scala[matching case] @java[match defined] the stream is failed.
@ -80,7 +80,7 @@ Java
## Recover with retries
@apidoc[recoverWithRetries](stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"} allows you to put a new upstream in place of the failed one, recovering
@apidoc[recoverWithRetries](stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"} allows you to put a new upstream in place of the failed one, recovering
stream failures up to a specified maximum number of times.
Deciding which exceptions should be recovered is done through a @scaladoc[PartialFunction](scala.PartialFunction). If an exception

View file

@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Migrate java.util.function.* to pekko.japi.function.*
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatusAndContext")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatus")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.ask")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatus")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatusAndContext")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorSource.actorRef")

View file

@ -13,12 +13,12 @@
package org.apache.pekko.stream.typed.javadsl
import java.util.function.BiFunction
import scala.concurrent.duration._
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.typed.ActorRef
import pekko.japi.Pair
import pekko.japi.function
import pekko.pattern.StatusReply
import pekko.stream.javadsl.Flow
import pekko.util.JavaDurationConverters
@ -68,7 +68,7 @@ object ActorFlow {
def ask[I, Q, A](
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[I, A, NotUsed] =
makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[I, A, NotUsed] =
org.apache.pekko.stream.typed.scaladsl.ActorFlow
.ask[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))(
JavaDurationConverters.asFiniteDuration(timeout))
@ -82,7 +82,7 @@ object ActorFlow {
def askWithStatus[I, Q, A](
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] =
makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] =
org.apache.pekko.stream.typed.scaladsl.ActorFlow
.askWithStatus[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))(
JavaDurationConverters.asFiniteDuration(timeout))
@ -139,7 +139,7 @@ object ActorFlow {
parallelism: Int,
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] =
makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] =
org.apache.pekko.stream.typed.scaladsl.ActorFlow
.askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis)
.asJava
@ -150,7 +150,7 @@ object ActorFlow {
def askWithContext[I, Q, A, Ctx](
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] =
makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] =
org.apache.pekko.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)
@ -169,7 +169,7 @@ object ActorFlow {
def askWithStatusAndContext[I, Q, A, Ctx](
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] =
makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] =
org.apache.pekko.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)
@ -187,7 +187,7 @@ object ActorFlow {
parallelism: Int,
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = {
makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = {
org.apache.pekko.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)

View file

@ -13,11 +13,10 @@
package org.apache.pekko.stream.typed.javadsl
import java.util.function.Predicate
import org.apache.pekko
import pekko.actor.typed._
import pekko.japi.JavaPartialFunction
import pekko.japi.function
import pekko.stream.{ CompletionStrategy, OverflowStrategy }
import pekko.stream.javadsl._
@ -58,7 +57,7 @@ object ActorSource {
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def actorRef[T](
completionMatcher: Predicate[T],
completionMatcher: function.Predicate[T],
failureMatcher: pekko.japi.function.Function[T, java.util.Optional[Throwable]],
bufferSize: Int,
overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = {

View file

@ -18,3 +18,30 @@
# Migrate functions in japi.* to japi.function.*
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.JavaFlowSupport#Flow.fromProcessor")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.JavaFlowSupport#Flow.fromProcessorMat")
# Migrate java.util.function.* to pekko.japi.function.*
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.RestartSettings.withRestartOn")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.groupedWeighted")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.delayWith")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recover")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recoverWith")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recoverWithRetries")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.aggregateWithBoundary")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.fromMaterializer")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.PartitionHub.of")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.PartitionHub.ofStateful")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Sink.fromMaterializer")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recover")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recoverWith")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recoverWithRetries")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.groupedWeighted")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.delayWith")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.aggregateWithBoundary")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.fromMaterializer")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubFlow.delayWith")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubFlow.aggregateWithBoundary")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubSource.delayWith")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubSource.aggregateWithBoundary")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.TLS.create")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Tcp.outgoingConnectionWithTls")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Tcp.bindWithTls")

View file

@ -16,10 +16,10 @@ package org.apache.pekko.stream
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.japi.function
import pekko.event.Logging
import pekko.event.Logging.LogLevel
import pekko.util.ConstantFun
import pekko.util.FunctionConverters._
import pekko.util.JavaDurationConverters._
final class RestartSettings private (
@ -58,8 +58,8 @@ final class RestartSettings private (
copy(maxRestarts = count, maxRestartsWithin = within.asScala)
/** Decides whether the failure should restart the stream or make the surrounding stream fail */
def withRestartOn(restartOn: java.util.function.Predicate[Throwable]): RestartSettings =
copy(restartOn = restartOn.asScala)
def withRestartOn(restartOn: function.Predicate[Throwable]): RestartSettings =
copy(restartOn = t => restartOn.test(t))
def withLogSettings(newLogSettings: RestartSettings.LogSettings): RestartSettings =
copy(logSettings = newLogSettings)

View file

@ -16,8 +16,6 @@ package org.apache.pekko.stream.javadsl
import java.util.Comparator
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import java.util.function.Supplier
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
@ -112,7 +110,7 @@ object Flow {
* [[Attributes]] of the [[Flow]] returned by this method.
*/
def fromMaterializer[I, O, M](
factory: BiFunction[Materializer, Attributes, Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] =
factory: function.Function2[Materializer, Attributes, Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] =
scaladsl.Flow.fromMaterializer((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.asJava).asJava
/**
@ -1273,7 +1271,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] =
costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] =
new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step
/**
@ -1316,7 +1314,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long])
costFn: function.Function[Out, java.lang.Long])
: javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
new Flow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
@ -1740,13 +1738,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*
* @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
* @param delayStrategyCreator creates new [[DelayStrategy]] object for each materialization
* @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delayWith(
delayStrategySupplier: Supplier[DelayStrategy[Out]],
delayStrategyCreator: function.Creator[DelayStrategy[Out]],
overFlowStrategy: DelayOverflowStrategy): Flow[In, Out, Mat] =
new Flow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
new Flow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
* Discard the given number of elements at the beginning of the stream.
@ -1892,9 +1890,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Flow[In, Out, Mat] =
def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): javadsl.Flow[In, Out, Mat] =
recover {
case elem if clazz.isInstance(elem) => supplier.get()
case elem if clazz.isInstance(elem) => creator.create()
}
/**
@ -1984,9 +1982,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*/
def recoverWith(
clazz: Class[_ <: Throwable],
supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
recoverWith({
case elem if clazz.isInstance(elem) => supplier.get()
case elem if clazz.isInstance(elem) => creator.create()
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
@ -2043,15 +2041,15 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* @param attempts Maximum number of retries or -1 to retry indefinitely
* @param clazz the class object of the failure cause
* @param supplier supply the new Source to be materialized
* @param creator supply the new Source to be materialized
*/
def recoverWithRetries(
attempts: Int,
clazz: Class[_ <: Throwable],
supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
recoverWithRetries(attempts,
{
case elem if clazz.isInstance(elem) => supplier.get()
case elem if clazz.isInstance(elem) => creator.create()
})
/**
@ -2104,7 +2102,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Flow[In, Out, Mat] =
def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})
@ -4308,13 +4306,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0")
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
: javadsl.Flow[In, Emit, Mat] = {
asScala
.aggregateWithBoundary(() => allocate.get())(
.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
@ -4341,14 +4339,14 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]])
emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
: javadsl.Flow[In, Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
asScala
.aggregateWithBoundary(() => allocate.get())(
.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {

View file

@ -13,9 +13,10 @@
package org.apache.pekko.stream.javadsl
import java.util.function.{ BiFunction, Supplier, ToLongBiFunction }
import java.util.function.ToLongBiFunction
import org.apache.pekko
import pekko.japi.function
import pekko.NotUsed
import pekko.annotation.DoNotInherit
import pekko.util.unused
@ -259,11 +260,11 @@ object PartitionHub {
*/
def ofStateful[T](
@unused clazz: Class[T],
partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]],
partitioner: function.Creator[ToLongBiFunction[ConsumerInfo, T]],
startAfterNrOfConsumers: Int,
bufferSize: Int): Sink[T, Source[T, NotUsed]] = {
val p: () => (pekko.stream.scaladsl.PartitionHub.ConsumerInfo, T) => Long = () => {
val f = partitioner.get()
val f = partitioner.create()
(info, elem) => f.applyAsLong(info, elem)
}
pekko.stream.scaladsl.PartitionHub
@ -303,7 +304,7 @@ object PartitionHub {
*/
def ofStateful[T](
clazz: Class[T],
partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]],
partitioner: function.Creator[ToLongBiFunction[ConsumerInfo, T]],
startAfterNrOfConsumers: Int): Sink[T, Source[T, NotUsed]] =
ofStateful(clazz, partitioner, startAfterNrOfConsumers, pekko.stream.scaladsl.PartitionHub.defaultBufferSize)
@ -338,7 +339,7 @@ object PartitionHub {
*/
def of[T](
@unused clazz: Class[T],
partitioner: BiFunction[Integer, T, Integer],
partitioner: function.Function2[Integer, T, Integer],
startAfterNrOfConsumers: Int,
bufferSize: Int): Sink[T, Source[T, NotUsed]] =
pekko.stream.scaladsl.PartitionHub
@ -377,7 +378,7 @@ object PartitionHub {
*/
def of[T](
clazz: Class[T],
partitioner: BiFunction[Integer, T, Integer],
partitioner: function.Function2[Integer, T, Integer],
startAfterNrOfConsumers: Int): Sink[T, Source[T, NotUsed]] =
of(clazz, partitioner, startAfterNrOfConsumers, pekko.stream.scaladsl.PartitionHub.defaultBufferSize)

View file

@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
import java.util.Optional
import java.util.concurrent.{ CompletableFuture, CompletionStage }
import java.util.function.BiFunction
import java.util.stream.Collector
import scala.annotation.unchecked.uncheckedVariance
@ -391,7 +390,8 @@ object Sink {
* exposes [[Materializer]] which is going to be used during materialization and
* [[Attributes]] of the [[Sink]] returned by this method.
*/
def fromMaterializer[T, M](factory: BiFunction[Materializer, Attributes, Sink[T, M]]): Sink[T, CompletionStage[M]] =
def fromMaterializer[T, M](
factory: function.Function2[Materializer, Attributes, Sink[T, M]]): Sink[T, CompletionStage[M]] =
scaladsl.Sink.fromMaterializer((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.asJava).asJava
/**

View file

@ -16,7 +16,6 @@ package org.apache.pekko.stream.javadsl
import java.util
import java.util.Optional
import java.util.concurrent.{ CompletableFuture, CompletionStage }
import java.util.function.{ BiFunction, Supplier }
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
@ -507,7 +506,7 @@ object Source {
* [[Attributes]] of the [[Source]] returned by this method.
*/
def fromMaterializer[T, M](
factory: BiFunction[Materializer, Attributes, Source[T, M]]): Source[T, CompletionStage[M]] =
factory: function.Function2[Materializer, Attributes, Source[T, M]]): Source[T, CompletionStage[M]] =
scaladsl.Source.fromMaterializer((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.asJava).asJava
/**
@ -2129,9 +2128,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Source[Out, Mat] =
def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): javadsl.Source[Out, Mat] =
recover {
case elem if clazz.isInstance(elem) => supplier.get()
case elem if clazz.isInstance(elem) => creator.create()
}
/**
@ -2221,9 +2220,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*/
def recoverWith(
clazz: Class[_ <: Throwable],
supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
recoverWith({
case elem if clazz.isInstance(elem) => supplier.get()
case elem if clazz.isInstance(elem) => creator.create()
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
@ -2277,15 +2276,15 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* @param attempts Maximum number of retries or -1 to retry indefinitely
* @param clazz the class object of the failure cause
* @param supplier supply the new Source to be materialized
* @param creator create the new Source to be materialized
*/
def recoverWithRetries(
attempts: Int,
clazz: Class[_ <: Throwable],
supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
recoverWithRetries(attempts,
{
case elem if clazz.isInstance(elem) => supplier.get()
case elem if clazz.isInstance(elem) => creator.create()
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
@ -2338,7 +2337,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Source[Out, Mat] =
def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): javadsl.Source[Out, Mat] =
new Source(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})
@ -2988,7 +2987,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
def groupedWeighted(minWeight: Long, costFn: java.util.function.Function[Out, java.lang.Long])
def groupedWeighted(minWeight: Long, costFn: function.Function[Out, java.lang.Long])
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava))
@ -3031,7 +3030,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long])
costFn: function.Function[Out, java.lang.Long])
: javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
@ -3444,13 +3443,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*
* @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
* @param delayStrategyCreator creates new [[DelayStrategy]] object for each materialization
* @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delayWith(
delayStrategySupplier: Supplier[DelayStrategy[Out]],
delayStrategyCreator: function.Creator[DelayStrategy[Out]],
overFlowStrategy: DelayOverflowStrategy): Source[Out, Mat] =
new Source(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
new Source(delegate.delayWith(() => DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
* Discard the given number of elements at the beginning of the stream.
@ -4749,12 +4748,12 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0")
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = {
emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = {
asScala
.aggregateWithBoundary(() => allocate.get())(
.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
@ -4781,13 +4780,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]): javadsl.Source[Emit, Mat] = {
emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]]): javadsl.Source[Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
asScala
.aggregateWithBoundary(() => allocate.get())(
.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {

View file

@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
@ -717,7 +716,7 @@ final class SubFlow[In, Out, Mat](
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long])
costFn: function.Function[Out, java.lang.Long])
: SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] =
new SubFlow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
@ -1134,13 +1133,13 @@ final class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*
* @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
* @param delayStrategyCreator creates new [[DelayStrategy]] object for each materialization
* @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delayWith(
delayStrategySupplier: Supplier[DelayStrategy[Out]],
delayStrategyCreator: function.Creator[DelayStrategy[Out]],
overFlowStrategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] =
new SubFlow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
new SubFlow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
* Discard the given number of elements at the beginning of the stream.
@ -1367,7 +1366,7 @@ final class SubFlow[In, Out, Mat](
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): SubFlow[In, Out, Mat] =
def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})
@ -2790,13 +2789,13 @@ final class SubFlow[In, Out, Mat](
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0")
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
: javadsl.SubFlow[In, Emit, Mat] = {
new SubFlow(
asScala.aggregateWithBoundary(() => allocate.get())(
asScala.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
@ -2822,14 +2821,14 @@ final class SubFlow[In, Out, Mat](
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]])
emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
: javadsl.SubFlow[In, Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
new SubFlow(
asScala.aggregateWithBoundary(() => allocate.get())(
asScala.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {

View file

@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.annotation.varargs
import scala.annotation.unchecked.uncheckedVariance
@ -707,7 +706,7 @@ final class SubSource[Out, Mat](
*/
def groupedAdjacentByWeighted[R](f: function.Function[Out, R],
maxWeight: Long,
costFn: java.util.function.Function[Out, java.lang.Long])
costFn: function.Function[Out, java.lang.Long])
: SubSource[java.util.List[Out @uncheckedVariance], Mat] =
new SubSource(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava))
@ -1226,13 +1225,13 @@ final class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*
* @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
* @param delayStrategyCreator creates new [[DelayStrategy]] object for each materialization
* @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delayWith(
delayStrategySupplier: Supplier[DelayStrategy[Out]],
delayStrategyCreator: function.Creator[DelayStrategy[Out]],
overFlowStrategy: DelayOverflowStrategy): SubSource[Out, Mat] =
new SubSource(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
new SubSource(delegate.delayWith(() => DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy))
/**
* Recover allows to send last element on failure and gracefully complete the stream
@ -1344,7 +1343,7 @@ final class SubSource[Out, Mat](
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): SubSource[Out, Mat] =
def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): SubSource[Out, Mat] =
new SubSource(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})
@ -2764,13 +2763,13 @@ final class SubSource[Out, Mat](
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
@deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0")
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration])
emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration])
: javadsl.SubSource[Emit, Mat] = {
new SubSource(
asScala.aggregateWithBoundary(() => allocate.get())(
asScala.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = Option(emitOnTimer).map {
@ -2796,14 +2795,14 @@ final class SubSource[Out, Mat](
* @param harvest this is invoked before emit within the current stage/operator
* @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval
*/
def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg],
def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg],
aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]],
harvest: function.Function[Agg, Emit],
emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]])
emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]])
: javadsl.SubSource[Emit, Mat] = {
import org.apache.pekko.util.OptionConverters._
new SubSource(
asScala.aggregateWithBoundary(() => allocate.get())(
asScala.aggregateWithBoundary(() => allocate.create())(
aggregate = (agg, out) => aggregate.apply(agg, out).toScala,
harvest = agg => harvest.apply(agg),
emitOnTimer = emitOnTimer.toScala.map {

View file

@ -13,13 +13,13 @@
package org.apache.pekko.stream.javadsl
import java.util.function.{ Consumer, Supplier }
import javax.net.ssl.{ SSLEngine, SSLSession }
import scala.util.Try
import org.apache.pekko
import pekko.NotUsed
import pekko.japi.function
import pekko.stream._
import pekko.stream.TLSProtocol._
import pekko.util.ByteString
@ -73,11 +73,11 @@ object TLS {
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*/
def create(
sslEngineCreator: Supplier[SSLEngine],
sessionVerifier: Consumer[SSLSession],
sslEngineCreator: function.Creator[SSLEngine],
sessionVerifier: function.Procedure[SSLSession],
closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(
scaladsl.TLS.apply(() => sslEngineCreator.get(), session => Try(sessionVerifier.accept(session)), closing))
scaladsl.TLS.apply(() => sslEngineCreator.create(), session => Try(sessionVerifier(session)), closing))
/**
* Create a StreamTls [[pekko.stream.javadsl.BidiFlow]]. This is a low-level interface.
@ -88,9 +88,9 @@ object TLS {
* For a description of the `closing` parameter please refer to [[TLSClosing]].
*/
def create(
sslEngineCreator: Supplier[SSLEngine],
sslEngineCreator: function.Creator[SSLEngine],
closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] =
new javadsl.BidiFlow(scaladsl.TLS.apply(() => sslEngineCreator.get(), closing))
new javadsl.BidiFlow(scaladsl.TLS.apply(() => sslEngineCreator.create(), closing))
}
/**

View file

@ -17,8 +17,6 @@ import java.lang.{ Iterable => JIterable }
import java.net.InetSocketAddress
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.{ Function => JFunction }
import java.util.function.Supplier
import javax.net.ssl.SSLEngine
import javax.net.ssl.SSLSession
@ -35,6 +33,7 @@ import pekko.actor.ExtensionId
import pekko.actor.ExtensionIdProvider
import pekko.annotation.InternalApi
import pekko.io.Inet.SocketOption
import pekko.japi.function
import pekko.stream.Materializer
import pekko.stream.SystemMaterializer
import pekko.stream.TLSClosing
@ -261,10 +260,10 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
createSSLEngine: Supplier[SSLEngine]): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] =
createSSLEngine: function.Creator[SSLEngine]): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] =
Flow.fromGraph(
delegate
.outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => createSSLEngine.get())
.outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => createSSLEngine.create())
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava))
/**
@ -279,18 +278,18 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
*/
def outgoingConnectionWithTls(
remoteAddress: InetSocketAddress,
createSSLEngine: Supplier[SSLEngine],
createSSLEngine: function.Creator[SSLEngine],
localAddress: Optional[InetSocketAddress],
options: JIterable[SocketOption],
connectTimeout: Optional[java.time.Duration],
idleTimeout: Optional[java.time.Duration],
verifySession: JFunction[SSLSession, Optional[Throwable]],
verifySession: function.Function[SSLSession, Optional[Throwable]],
closing: TLSClosing): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = {
Flow.fromGraph(
delegate
.outgoingConnectionWithTls(
remoteAddress,
createSSLEngine = () => createSSLEngine.get(),
createSSLEngine = () => createSSLEngine.create(),
localAddress.toScala,
CollectionUtil.toSeq(options),
optionalDurationToScala(connectTimeout),
@ -313,10 +312,10 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
def bindWithTls(
interface: String,
port: Int,
createSSLEngine: Supplier[SSLEngine]): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
createSSLEngine: function.Creator[SSLEngine]): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
Source.fromGraph(
delegate
.bindWithTls(interface, port, createSSLEngine = () => createSSLEngine.get())
.bindWithTls(interface, port, createSSLEngine = () => createSSLEngine.create())
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava))
}
@ -330,18 +329,18 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
def bindWithTls(
interface: String,
port: Int,
createSSLEngine: Supplier[SSLEngine],
createSSLEngine: function.Creator[SSLEngine],
backlog: Int,
options: JIterable[SocketOption],
idleTimeout: Optional[java.time.Duration],
verifySession: JFunction[SSLSession, Optional[Throwable]],
verifySession: function.Function[SSLSession, Optional[Throwable]],
closing: TLSClosing): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
Source.fromGraph(
delegate
.bindWithTls(
interface,
port,
createSSLEngine = () => createSSLEngine.get(),
createSSLEngine = () => createSSLEngine.create(),
backlog,
CollectionUtil.toSeq(options),
optionalDurationToScala(idleTimeout),