trait _flow[A] extends Stream.Flow._extend._flow[A]
- Alphabetic
- Stream._extend._flow
- Stream.Flow._extend._flow
- scala.AnyRef
- scala.Any
- Hide All
- Show All
- Public
- All
Method
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
finalize(): Unit
- Attributes
- protected[java.lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
noindex: ~[A]
Loose indexing optimizations
Loose indexing optimizations
Many streams from indexed sources (like IndexedSeq, Array, Vector, etc) are special. They know their size and can read elements without iteration. They can optimize operations like take, dropNext, letLast, and many others
noindex
turns this privileged Streams into regular and it is needed occasionally for debugging and testing- Definition Classes
- _flow
-
def
nosize: ~[A]
Loose size information
Loose size information
Many streams return sizeOpt, knowing their current size
nosize
drops sizing information, so some optimizations will not be availableThis is primarily for testing and debgging
- Definition Classes
- _flow
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
-
def
parallel: Stream.Flow[A]
Parallel
Parallel
Returns Stream.Flow with parallel execution
Each consecutive element will be sent to a new thread for processing
(1 to 5).all.parallel.map("Value: " + _ + "\t" + Thread.currentThread.getName).peek(println).drain // Output Value: 1 ForkJoinPool.commonPool-worker-9 Value: 3 ForkJoinPool.commonPool-worker-11 Value: 2 main Value: 4 ForkJoinPool.commonPool-worker-2 Value: 5 ForkJoinPool.commonPool-worker-4
- Definition Classes
- _flow
-
def
parallelIf(boolean: Boolean): Stream.Flow[A]
Conditionally parallel
Conditionally parallel
Switches to parallel execution if boolean parameter == true
Returns Stream.Flow, which could be implemented as sequential Stream or parallel Stream.Flow
(1 to 50).all.parallelIf(true).isParallel // Returns true (1 to 50).all.parallelIf(false).isParallep // Returns false
- Definition Classes
- _flow
-
def
parallelIfOver(threshold: Int): Stream.Flow[A]
Conditionally parallel
Conditionally parallel
Switches to parallel execution if number of elements exceeds threshold
Returns Stream.Flow, which could be implemented as sequential Stream or parallel Stream.Flow
(1 to 50).all.parallelIfOver(100).isParallel // Returns false (1 to 200).all.parallelIfOver(100).isParallel // Returns true
- Definition Classes
- _flow
-
def
preview: Stream[A] with Stream.Interface.Preview[A]
Adds preview capabilities
Adds preview capabilities
Returns Interface.Preview, which allows to pre-load and inspect elements, even before they go through Stream
- Definition Classes
- _flow
-
def
reverse: ~[A]
Reverse order
Reverse order
Re-arranges elements is reverse order
Note: this operation is not suitable for large streams
- Definition Classes
- _flow
-
def
reverseSized(size: Int): ~[A]
Reverse order in segments
Reverse order in segments
Reverses order of elements within segments of fixed size
Use Case: Predefined Shuffle
For testing it is often needed to get elements in random order. However it cannot be completely random, if we want to replicate the bug
reverseSized can shuffle elements in a predefined order, given same group size
(1 to 15).all.reverseSized(5).lp // Prints ~(5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 15, 14, 13, 12, 11)
- Definition Classes
- _flow
-
def
sequential: Stream[A]
Restores potentially parallel Flow back to Stream
If this is already a Stream, the operation is instant, returning this
Otherwise the operation is quite expensive
In many cases it is advisable to consume pipeline as Flow instead of converting to Stream
val (count, millis) = (1 to 1000).all .parallel // Switching to parallel Stream.Flow .peek(_ => Thread.sleep(1)) // Expensive operation .sequential // Back to Stream .countAndMillis println("Count = " + count + ", done in " + millis / 1000F + " secs") // Output Count = 1000, done in 0.224 secs // Note: We have 1000 elements each pausing for 1 millis. // Without parallel processing total time would be over 1 second
-
def
shuffle: ~[A]
Randomize order
Randomize order
Re-arranges elements is random order
Note: this operation requires full buffering and is not suitable for large streams
- Definition Classes
- _flow
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
def
transpose[B](implicit f: Mapping[A, ~[B]]): ~[~[B]]
Transpose
Transpose
Transposes matrix where rows become columns
def all: ~[~[Int]] = ~~(11 to 15, List(21, 22, 23, 24, 25), Vector(31, 32, 33, 34, 35)) all.tp all.transpose.tp // Output --------------------- ? --------------------- ~(11, 12, 13, 14, 15) ~(21, 22, 23, 24, 25) ~(31, 32, 33, 34, 35) --------------------- ------------- ? ------------- ~(11, 21, 31) ~(12, 22, 32) ~(13, 23, 33) ~(14, 24, 34) ~(15, 25, 35) -------------
- Definition Classes
- _flow
-
def
unfold(f: Mapping[~[A], A]): ~[A]
Lazy infinite stream
Lazy infinite stream
Lazily unfolds next value with a function taking all prior values
// Unfoldifg Fibonacci Sequence (0 to 1).all.unfold(_.letLast(2).sum).letNext(20).lp // Output ~(0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181)
- Definition Classes
- _flow
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
Flow Control