fix Trigger references in the stream cookbook example for digest #24317

* replace Trigger with Unit
 * cosmetic changes in docs.stream.cookbook tests
This commit is contained in:
Roman Filonenko 2018-03-03 08:22:46 +01:00
parent 1574faf180
commit 8fcf728b7a
15 changed files with 52 additions and 90 deletions

View file

@ -2,9 +2,8 @@ package docs.stream.cookbook
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ Graph, SourceShape }
import akka.testkit.TimingTest
import akka.{ Done, NotUsed }

View file

@ -3,16 +3,10 @@
*/
package docs.stream.cookbook
import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets
import java.util.zip.GZIPOutputStream
import akka.stream.impl.io.compression.GzipCompressor
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._

View file

@ -1,12 +1,5 @@
package docs.stream.cookbook
import java.security.MessageDigest
import akka.NotUsed
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
@ -16,28 +9,32 @@ class RecipeDigest extends RecipeSpec {
"work" in {
val data = Source(List(
ByteString("abcdbcdecdef"),
ByteString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq")))
//#calculating-digest
import java.security.MessageDigest
import akka.NotUsed
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import akka.stream.stage._
val data: Source[ByteString, NotUsed] = Source.single(ByteString("abc"))
class DigestCalculator(algorithm: String) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("DigestCalculator.in")
val out = Outlet[ByteString]("DigestCalculator.out")
override val shape = FlowShape.of(in, out)
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val digest = MessageDigest.getInstance(algorithm)
private val digest = MessageDigest.getInstance(algorithm)
setHandler(out, new OutHandler {
override def onPull(): Trigger = {
pull(in)
}
override def onPull(): Unit = pull(in)
})
setHandler(in, new InHandler {
override def onPush(): Trigger = {
override def onPush(): Unit = {
val chunk = grab(in)
digest.update(chunk.toArray)
pull(in)
@ -48,25 +45,22 @@ class RecipeDigest extends RecipeSpec {
completeStage()
}
})
}
}
val digest: Source[ByteString, NotUsed] = data.via(new DigestCalculator("SHA-256"))
//#calculating-digest
Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
0xe5, 0xc0, 0x26, 0x93,
0x0c, 0x3e, 0x60, 0x39,
0xa3, 0x3c, 0xe4, 0x59,
0x64, 0xff, 0x21, 0x67,
0xf6, 0xec, 0xed, 0xd4,
0x19, 0xdb, 0x06, 0xc1))
0xba, 0x78, 0x16, 0xbf,
0x8f, 0x01, 0xcf, 0xea,
0x41, 0x41, 0x40, 0xde,
0x5d, 0xae, 0x22, 0x23,
0xb0, 0x03, 0x61, 0xa3,
0x96, 0x17, 0x7a, 0x9c,
0xb4, 0x10, 0xff, 0x61,
0xf2, 0x00, 0x15, 0xad))
}
}
}

View file

@ -4,9 +4,6 @@ import akka.stream.{ ClosedShape, OverflowStrategy }
import akka.stream.scaladsl._
import akka.stream.testkit._
import scala.collection.immutable
import scala.concurrent.Await
class RecipeDroppyBroadcast extends RecipeSpec {
"Recipe for a droppy broadcast" must {

View file

@ -3,7 +3,6 @@ package docs.stream.cookbook
import akka.NotUsed
import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._

View file

@ -91,7 +91,7 @@ class RecipeHold extends RecipeSpec {
.run()
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
subscription.request(1)
sub.expectNext(0)
@ -121,10 +121,10 @@ class RecipeHold extends RecipeSpec {
source.via(new HoldWithWait).to(sink).run()
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
subscription.request(1)
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
pub.sendNext(1)
sub.expectNext(1)

View file

@ -1,7 +1,6 @@
package docs.stream.cookbook
import akka.NotUsed
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.util.ByteString

View file

@ -8,6 +8,7 @@ import scala.concurrent.duration._
class RecipeManualTrigger extends RecipeSpec {
"Recipe for triggering a stream manually" must {
type Trigger = Unit
"work" in {
@ -31,17 +32,17 @@ class RecipeManualTrigger extends RecipeSpec {
graph.run()
sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
pub.sendNext(())
sub.expectNext("1")
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
pub.sendNext(())
pub.sendNext(())
sub.expectNext("2")
sub.expectNext("3")
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
pub.sendNext(())
sub.expectNext("4")
@ -71,17 +72,17 @@ class RecipeManualTrigger extends RecipeSpec {
graph.run()
sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
pub.sendNext(())
sub.expectNext("1")
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
pub.sendNext(())
pub.sendNext(())
sub.expectNext("2")
sub.expectNext("3")
sub.expectNoMsg(100.millis)
sub.expectNoMessage(100.millis)
pub.sendNext(())
sub.expectNext("4")

View file

@ -4,7 +4,6 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._

View file

@ -1,11 +1,9 @@
package docs.stream.cookbook
import akka.NotUsed
import akka.stream.{ Graph, FlowShape, Inlet, Outlet, Attributes, OverflowStrategy }
import akka.stream.scaladsl._
import scala.concurrent.{ Await, Future }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.stage.{ GraphStage, GraphStageLogic }
class RecipeReduceByKey extends RecipeSpec {

View file

@ -5,8 +5,6 @@ package docs.stream.cookbook
import akka.stream.scaladsl._
import scala.concurrent.Future
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
class RecipeSeq extends RecipeSpec {

View file

@ -7,7 +7,5 @@ trait RecipeSpec extends AkkaSpec {
implicit val m = ActorMaterializer()
type Message = String
type Trigger = Unit
type Job = String
}

View file

@ -3,7 +3,6 @@ package docs.stream.cookbook
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl._
import akka.testkit.TestProbe
import scala.concurrent.Await
import scala.concurrent.duration._