Implement throttle for Source(Flow)WithContext (#29107)

This commit is contained in:
XIAO Yang 2020-06-08 20:30:42 +09:00 committed by GitHub
parent 34b9a26b98
commit 6328e0a6d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 449 additions and 0 deletions

View file

@ -0,0 +1,65 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.javadsl;
import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.StreamTest;
import akka.stream.ThrottleMode;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class FlowWithContextThrottleTest extends StreamTest {
public FlowWithContextThrottleTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ThrottleTest", AkkaSpec.testConf());
@Test
public void mustWorksForTwoStreams() throws Exception {
final FlowWithContext<Integer, String, Integer, String, NotUsed> sharedThrottle =
FlowWithContext.<Integer, String>create()
.throttle(1, java.time.Duration.ofDays(1), 1, (a) -> 1, ThrottleMode.enforcing());
CompletionStage<List<Pair<Integer, String>>> result1 =
Source.single(new Pair<>(1, "context-a"))
.via(sharedThrottle.asFlow())
.via(sharedThrottle.asFlow())
.runWith(Sink.seq(), system);
// If there is accidental shared state then we would not be able to pass through the single
// element
List<Pair<Integer, String>> pairs1 = result1.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(1, pairs1.size());
assertEquals(Integer.valueOf(1), pairs1.get(0).first());
assertEquals("context-a", pairs1.get(0).second());
// It works with a new stream, too
CompletionStage<List<Pair<Integer, String>>> result2 =
Source.single(new Pair<>(2, "context-b"))
.via(sharedThrottle.asFlow())
.via(sharedThrottle.asFlow())
.runWith(Sink.seq(), system);
List<Pair<Integer, String>> pairs2 = result2.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(1, pairs2.size());
assertEquals(Integer.valueOf(2), pairs2.get(0).first());
assertEquals("context-b", pairs2.get(0).second());
}
}

View file

@ -0,0 +1,47 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.javadsl;
import akka.japi.Pair;
import akka.stream.StreamTest;
import akka.stream.ThrottleMode;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import org.junit.ClassRule;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class SourceWithContextThrottleTest extends StreamTest {
public SourceWithContextThrottleTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ThrottleTest", AkkaSpec.testConf());
@Test
public void mustBeAbleToUseThrottle() throws Exception {
List<Pair<Integer, String>> list =
Arrays.asList(
new Pair<>(0, "context-a"), new Pair<>(1, "context-b"), new Pair<>(2, "context-c"));
Pair<Integer, String> result =
SourceWithContext.fromPairs(Source.from(list))
.throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.shaping())
.throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.enforcing())
.runWith(Sink.head(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
assertEquals(list.get(0), result);
}
}

View file

@ -0,0 +1,208 @@
/*
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.ThrottleMode.Shaping
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink
import scala.concurrent.duration._
class FlowWithContextThrottleSpec extends StreamSpec("""
akka.stream.materializer.initial-input-buffer-size = 2
akka.stream.materializer.max-input-buffer-size = 2
""") {
private def toMessage(i: Int) = Message(s"data-$i", i.toLong)
private def genMessage(length: Int, i: Int) = Message("a" * length, i.toLong)
"throttle() on FlowWithContextOps" must {
"on FlowWithContext" must {
"work for the happy case" in assertAllStagesStopped {
val throttle = FlowWithContext[Message, Long].throttle(19, 1000.millis, -1, Shaping)
val input = (1 to 5).map(toMessage)
val expected = input.map(message => (message, message.offset))
Source(input)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNextN(expected)
.expectComplete()
}
"accept very high rates" in assertAllStagesStopped {
val throttle = FlowWithContext[Message, Long].throttle(1, 1.nanos, 0, Shaping)
val input = (1 to 5).map(toMessage)
val expected = input.map(message => (message, message.offset))
Source(input)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNextN(expected)
.expectComplete()
}
"accept very low rates" in assertAllStagesStopped {
val throttle = FlowWithContext[Message, Long].throttle(1, 100.days, 1, Shaping)
val input = (1 to 5).map(toMessage)
val expected = (input.head, input.head.offset)
Source(input)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNext(expected)
.expectNoMessage(100.millis)
.cancel() // We won't wait 100 days, sorry
}
"emit single element per tick" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Message]()
val downstream = TestSubscriber.probe[(Message, Long)]()
val throttle = FlowWithContext[Message, Long].throttle(1, 300.millis, 0, Shaping)
Source
.fromPublisher(upstream)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.runWith(Sink.fromSubscriber(downstream))
downstream.request(20)
upstream.sendNext(Message("a", 1L))
downstream.expectNoMessage(150.millis)
downstream.expectNext((Message("a", 1L), 1L))
upstream.sendNext(Message("b", 2L))
downstream.expectNoMessage(150.millis)
downstream.expectNext((Message("b", 2L), 2L))
upstream.sendComplete()
downstream.expectComplete()
}
"emit elements according to cost" in assertAllStagesStopped {
val list = (1 to 4).map(i => genMessage(i * 2, i))
val throttle = FlowWithContext[Message, Long].throttle(2, 200.millis, 0, _.data.length, Shaping)
Source(list)
.asSourceWithContext(m => m.offset)
.via(throttle)
.asSource
.map(_._1)
.runWith(TestSink.probe[Message])
.request(4)
.expectNext(list(0))
.expectNoMessage(300.millis)
.expectNext(list(1))
.expectNoMessage(500.millis)
.expectNext(list(2))
.expectNoMessage(700.millis)
.expectNext(list(3))
.expectComplete()
}
}
"on SourceWithContext" must {
"work for the happy case" in assertAllStagesStopped {
val input = (1 to 5).map(toMessage)
val expected = input.map(message => (message, message.offset))
Source(input)
.asSourceWithContext(m => m.offset)
.throttle(19, 1000.millis, -1, Shaping)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNextN(expected)
.expectComplete()
}
"accept very high rates" in assertAllStagesStopped {
val input = (1 to 5).map(toMessage)
val expected = input.map(message => (message, message.offset))
Source(input)
.asSourceWithContext(m => m.offset)
.throttle(1, 1.nanos, 0, Shaping)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNextN(expected)
.expectComplete()
}
"accept very low rates" in assertAllStagesStopped {
val input = (1 to 5).map(toMessage)
val expected = (input.head, input.head.offset)
Source(input)
.asSourceWithContext(m => m.offset)
.throttle(1, 100.days, 1, Shaping)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.request(5)
.expectNext(expected)
.expectNoMessage(100.millis)
.cancel() // We won't wait 100 days, sorry
}
"emit single element per tick" in assertAllStagesStopped {
val upstream = TestPublisher.probe[Message]()
val downstream = TestSubscriber.probe[(Message, Long)]()
Source
.fromPublisher(upstream)
.asSourceWithContext(m => m.offset)
.throttle(1, 300.millis, 0, Shaping)
.asSource
.runWith(Sink.fromSubscriber(downstream))
downstream.request(20)
upstream.sendNext(Message("a", 1L))
downstream.expectNoMessage(150.millis)
downstream.expectNext((Message("a", 1L), 1L))
upstream.sendNext(Message("b", 2L))
downstream.expectNoMessage(150.millis)
downstream.expectNext((Message("b", 2L), 2L))
upstream.sendComplete()
downstream.expectComplete()
}
"emit elements according to cost" in assertAllStagesStopped {
val list = (1 to 4).map(i => genMessage(i * 2, i))
Source(list)
.asSourceWithContext(m => m.offset)
.throttle(2, 200.millis, 0, _.data.length, Shaping)
.asSource
.map(_._1)
.runWith(TestSink.probe[Message])
.request(4)
.expectNext(list(0))
.expectNoMessage(300.millis)
.expectNext(list(1))
.expectNoMessage(500.millis)
.expectNext(list(2))
.expectNoMessage(700.millis)
.expectNext(list(3))
.expectComplete()
}
}
}
}

View file

@ -14,6 +14,7 @@ import akka.japi.{ function, Pair, Util }
import akka.stream._
import akka.util.ConstantFun
import akka.util.ccompat.JavaConverters._
import akka.util.JavaDurationConverters._
object FlowWithContext {
@ -286,6 +287,50 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
marker: function.Function2[Out, CtxOut, LogMarker]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null)
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]].
*
* @see [[akka.stream.javadsl.Flow.throttle]]
*/
def throttle(elements: Int, per: java.time.Duration): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.throttle(elements, per.asScala))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]].
*
* @see [[akka.stream.javadsl.Flow.throttle]]
*/
def throttle(
elements: Int,
per: java.time.Duration,
maximumBurst: Int,
mode: ThrottleMode): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.throttle(elements, per.asScala, maximumBurst, mode))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]].
*
* @see [[akka.stream.javadsl.Flow.throttle]]
*/
def throttle(
cost: Int,
per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.throttle(cost, per.asScala, costCalculation.apply))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.throttle]].
*
* @see [[akka.stream.javadsl.Flow.throttle]]
*/
def throttle(
cost: Int,
per: java.time.Duration,
maximumBurst: Int,
costCalculation: function.Function[Out, Integer],
mode: ThrottleMode): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode))
def asScala: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
scaladsl.FlowWithContext.fromTuples(
scaladsl

View file

@ -17,6 +17,7 @@ import akka.japi.function
import akka.stream._
import akka.util.ConstantFun
import akka.util.ccompat.JavaConverters._
import akka.util.JavaDurationConverters._
object SourceWithContext {
@ -266,6 +267,50 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
def logWithMarker(name: String, marker: function.Function2[Out, Ctx, LogMarker]): SourceWithContext[Out, Ctx, Mat] =
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null)
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.throttle]].
*
* @see [[akka.stream.javadsl.Source.throttle]]
*/
def throttle(elements: Int, per: java.time.Duration): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.throttle(elements, per.asScala))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.throttle]].
*
* @see [[akka.stream.javadsl.Source.throttle]]
*/
def throttle(
elements: Int,
per: java.time.Duration,
maximumBurst: Int,
mode: ThrottleMode): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.throttle(elements, per.asScala, maximumBurst, mode))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.throttle]].
*
* @see [[akka.stream.javadsl.Source.throttle]]
*/
def throttle(
cost: Int,
per: java.time.Duration,
costCalculation: function.Function[Out, Integer]): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.throttle(cost, per.asScala, costCalculation.apply))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.throttle]].
*
* @see [[akka.stream.javadsl.Source.throttle]]
*/
def throttle(
cost: Int,
per: java.time.Duration,
maximumBurst: Int,
costCalculation: function.Function[Out, Integer],
mode: ThrottleMode): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.throttle(cost, per.asScala, maximumBurst, costCalculation.apply, mode))
/**
* Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]],
* concatenating the processing steps of both.

View file

@ -7,11 +7,13 @@ package akka.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream._
import akka.stream.impl.Throttle
import akka.util.ConstantFun
/**
@ -207,5 +209,42 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
via(flow.logWithMarker(name, marker.tupled, extractWithContext)(log))
}
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.throttle]].
*
* @see [[akka.stream.scaladsl.FlowOps.throttle]]
*/
def throttle(elements: Int, per: FiniteDuration): Repr[Out, Ctx] =
throttle(elements, per, Throttle.AutomaticMaximumBurst, ConstantFun.oneInt, ThrottleMode.Shaping)
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.throttle]].
*
* @see [[akka.stream.scaladsl.FlowOps.throttle]]
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out, Ctx] =
throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode)
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.throttle]].
*
* @see [[akka.stream.scaladsl.FlowOps.throttle]]
*/
def throttle(cost: Int, per: FiniteDuration, costCalculation: (Out) => Int): Repr[Out, Ctx] =
throttle(cost, per, Throttle.AutomaticMaximumBurst, costCalculation, ThrottleMode.Shaping)
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.throttle]].
*
* @see [[akka.stream.scaladsl.FlowOps.throttle]]
*/
def throttle(
cost: Int,
per: FiniteDuration,
maximumBurst: Int,
costCalculation: (Out) => Int,
mode: ThrottleMode): Repr[Out, Ctx] =
via(flow.throttle(cost, per, maximumBurst, a => costCalculation(a._1), mode))
private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)]
}