#22903 Fix code snippeds
* move scala and java sources out of paradox sources * replace relative paths with $code$ and $akka$ * fix broken includes
This commit is contained in:
parent
7bee495749
commit
81df4ff417
384 changed files with 1609 additions and 1602 deletions
|
|
@ -0,0 +1,91 @@
|
|||
package docs.stream
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
class StreamBuffersRateSpec extends AkkaSpec {
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"Demonstrate pipelining" in {
|
||||
def println(s: Any) = ()
|
||||
//#pipelining
|
||||
Source(1 to 3)
|
||||
.map { i => println(s"A: $i"); i }.async
|
||||
.map { i => println(s"B: $i"); i }.async
|
||||
.map { i => println(s"C: $i"); i }.async
|
||||
.runWith(Sink.ignore)
|
||||
//#pipelining
|
||||
}
|
||||
|
||||
"Demonstrate buffer sizes" in {
|
||||
//#materializer-buffer
|
||||
val materializer = ActorMaterializer(
|
||||
ActorMaterializerSettings(system)
|
||||
.withInputBuffer(
|
||||
initialSize = 64,
|
||||
maxSize = 64))
|
||||
//#materializer-buffer
|
||||
|
||||
//#section-buffer
|
||||
val section = Flow[Int].map(_ * 2).async
|
||||
.addAttributes(Attributes.inputBuffer(initial = 1, max = 1)) // the buffer size of this map is 1
|
||||
val flow = section.via(Flow[Int].map(_ / 2)).async // the buffer size of this map is the default
|
||||
//#section-buffer
|
||||
}
|
||||
|
||||
"buffering abstraction leak" in {
|
||||
//#buffering-abstraction-leak
|
||||
import scala.concurrent.duration._
|
||||
case class Tick()
|
||||
|
||||
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
// this is the asynchronous stage in this graph
|
||||
val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count).async)
|
||||
|
||||
Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0
|
||||
|
||||
Source.tick(initialDelay = 1.second, interval = 1.second, "message!")
|
||||
.conflateWithSeed(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1
|
||||
|
||||
zipper.out ~> Sink.foreach(println)
|
||||
ClosedShape
|
||||
})
|
||||
//#buffering-abstraction-leak
|
||||
}
|
||||
|
||||
"explcit buffers" in {
|
||||
trait Job
|
||||
def inboundJobsConnector(): Source[Job, NotUsed] = Source.empty
|
||||
//#explicit-buffers-backpressure
|
||||
// Getting a stream of jobs from an imaginary external system as a Source
|
||||
val jobs: Source[Job, NotUsed] = inboundJobsConnector()
|
||||
jobs.buffer(1000, OverflowStrategy.backpressure)
|
||||
//#explicit-buffers-backpressure
|
||||
|
||||
//#explicit-buffers-droptail
|
||||
jobs.buffer(1000, OverflowStrategy.dropTail)
|
||||
//#explicit-buffers-droptail
|
||||
|
||||
//#explicit-buffers-dropnew
|
||||
jobs.buffer(1000, OverflowStrategy.dropNew)
|
||||
//#explicit-buffers-dropnew
|
||||
|
||||
//#explicit-buffers-drophead
|
||||
jobs.buffer(1000, OverflowStrategy.dropHead)
|
||||
//#explicit-buffers-drophead
|
||||
|
||||
//#explicit-buffers-dropbuffer
|
||||
jobs.buffer(1000, OverflowStrategy.dropBuffer)
|
||||
//#explicit-buffers-dropbuffer
|
||||
|
||||
//#explicit-buffers-fail
|
||||
jobs.buffer(1000, OverflowStrategy.fail)
|
||||
//#explicit-buffers-fail
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue