t
scalqa

Stream._extend._flow

trait _flow[A] extends Stream.Flow._extend._flow[A]

Flow Control

Self Type
Stream[A]
Ordering
  1. Alphabetic
Inherited
  1. Stream._extend._flow
  2. Stream.Flow._extend._flow
  3. scala.AnyRef
  4. scala.Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Method

  1. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  2. def clone(): AnyRef
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  3. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  4. def equals(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  5. def finalize(): Unit
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  6. def hashCode(): Int
    Definition Classes
    AnyRef → Any
  7. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  8. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  9. def noindex: ~[A]

    Loose indexing optimizations

    Loose indexing optimizations

    Many streams from indexed sources (like IndexedSeq, Array, Vector, etc) are special. They know their size and can read elements without iteration. They can optimize operations like take, dropNext, letLast, and many others

    noindex turns this privileged Streams into regular and it is needed occasionally for debugging and testing

    Definition Classes
    _flow
  10. def nosize: ~[A]

    Loose size information

    Loose size information

    Many streams return sizeOpt, knowing their current size

    nosize drops sizing information, so some optimizations will not be available

    This is primarily for testing and debgging

    Definition Classes
    _flow
  11. final def notify(): Unit
    Definition Classes
    AnyRef
  12. final def notifyAll(): Unit
    Definition Classes
    AnyRef
  13. def parallel: Stream.Flow[A]

    Parallel

    Parallel

    Returns Stream.Flow with parallel execution

    Each consecutive element will be sent to a new thread for processing

    (1 to 5).all.parallel.map("Value: " + _ + "\t" + Thread.currentThread.getName).peek(println).drain
    
    // Output
    Value: 1    ForkJoinPool.commonPool-worker-9
    Value: 3    ForkJoinPool.commonPool-worker-11
    Value: 2    main
    Value: 4    ForkJoinPool.commonPool-worker-2
    Value: 5    ForkJoinPool.commonPool-worker-4
    Definition Classes
    _flow
  14. def parallelIf(boolean: Boolean): Stream.Flow[A]

    Conditionally parallel

    Conditionally parallel

    Switches to parallel execution if boolean parameter == true

    Returns Stream.Flow, which could be implemented as sequential Stream or parallel Stream.Flow

    (1 to 50).all.parallelIf(true).isParallel   // Returns true
    
    (1 to 50).all.parallelIf(false).isParallep  // Returns false
    Definition Classes
    _flow
  15. def parallelIfOver(threshold: Int): Stream.Flow[A]

    Conditionally parallel

    Conditionally parallel

    Switches to parallel execution if number of elements exceeds threshold

    Returns Stream.Flow, which could be implemented as sequential Stream or parallel Stream.Flow

    (1 to 50).all.parallelIfOver(100).isParallel   // Returns false
    
    (1 to 200).all.parallelIfOver(100).isParallel  // Returns true
    Definition Classes
    _flow
  16. def preview: Stream[A] with Stream.Interface.Preview[A]

    Adds preview capabilities

    Adds preview capabilities

    Returns Interface.Preview, which allows to pre-load and inspect elements, even before they go through Stream

    Definition Classes
    _flow
  17. def reverse: ~[A]

    Reverse order

    Reverse order

    Re-arranges elements is reverse order

    Note: this operation is not suitable for large streams

    Definition Classes
    _flow
  18. def reverseSized(size: Int): ~[A]

    Reverse order in segments

    Reverse order in segments

    Reverses order of elements within segments of fixed size

    Use Case: Predefined Shuffle

    For testing it is often needed to get elements in random order. However it cannot be completely random, if we want to replicate the bug

    reverseSized can shuffle elements in a predefined order, given same group size

    (1 to 15).all.reverseSized(5).lp // Prints ~(5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 15, 14, 13, 12, 11)
    Definition Classes
    _flow
  19. def sequential: Stream[A]

    Restore Flow to Stream

    Restore Flow to Stream

    Restores potentially parallel Flow back to Stream

    If this is already a Stream, the operation is instant, returning this

    Otherwise the operation is quite expensive

    In many cases it is advisable to consume pipeline as Flow instead of converting to Stream

    val (count, millis) = (1 to 1000).all
      .parallel                           // Switching to parallel Stream.Flow
      .peek(_ => Thread.sleep(1))         // Expensive operation
      .sequential                         // Back to Stream
      .countAndMillis
    
    println("Count = " + count + ", done in " + millis / 1000F + " secs")
    
    // Output
    Count = 1000, done in 0.224 secs
    
    // Note: We have 1000 elements each pausing for 1 millis.
    //       Without parallel processing total time would be over 1 second
    Definition Classes
    _flow_flow
  20. def shuffle: ~[A]

    Randomize order

    Randomize order

    Re-arranges elements is random order

    Note: this operation requires full buffering and is not suitable for large streams

    Definition Classes
    _flow
  21. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  22. def toString(): String
    Definition Classes
    AnyRef → Any
  23. def transpose[B](implicit f: Mapping[A, ~[B]]): ~[~[B]]

    Transpose

    Transpose

    Transposes matrix where rows become columns

    def all: ~[~[Int]] = ~~(11 to 15, List(21, 22, 23, 24, 25), Vector(31, 32, 33, 34, 35))
    
    all.tp
    
    all.transpose.tp
    
    // Output
    ---------------------
    ?
    ---------------------
    ~(11, 12, 13, 14, 15)
    ~(21, 22, 23, 24, 25)
    ~(31, 32, 33, 34, 35)
    ---------------------
    
    -------------
    ?
    -------------
    ~(11, 21, 31)
    ~(12, 22, 32)
    ~(13, 23, 33)
    ~(14, 24, 34)
    ~(15, 25, 35)
    -------------
    Definition Classes
    _flow
  24. def unfold(f: Mapping[~[A], A]): ~[A]

    Lazy infinite stream

    Lazy infinite stream

    Lazily unfolds next value with a function taking all prior values

     // Unfoldifg Fibonacci Sequence
    
    (0 to 1).all.unfold(_.letLast(2).sum).letNext(20).lp
    
     // Output
     ~(0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181)
    Definition Classes
    _flow
  25. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  26. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  27. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )

Operator

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any