forked from politrons/reactiveScala
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Scheduler.scala
68 lines (57 loc) · 2.38 KB
/
Scheduler.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package app.impl.rx
import java.util.concurrent.Executors
import app.impl.Generic
import org.junit.Test
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.ExecutionContextScheduler
import scala.concurrent.ExecutionContext;
/**
* Scheduler is an object that schedules units of work, which means set in which thread the execution will happens.
* We can specify if we want that the whole pipeline will be executed in a specific thread subscribeOn
* Or if we want to just execute some steps in a specific thread observerOn
*/
class Scheduler extends Generic[String, Long] {
val executor = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val scheduler = ExecutionContextScheduler(executor)
/**
* subscribeOn specify in which thread the pipeline will executed once the observer subscribe it
* Shall print
*
* Thread out of the pipeline:main
* Thread in pipeline:pool-1-thread-1
*/
@Test def subscribeOn(): Unit = {
addHeader("subscribeOn observable")
println("Thread out of the pipeline:" + Thread.currentThread().getName)
Observable.just("Hello async scala world")
.subscribeOn(scheduler)
.doOnNext(s =>
println("Thread in pipeline:" + Thread.currentThread().getName))
.subscribe(n => println(n))
}
/**
* observerOn operator establish in the pipeline once is set that the rest of steps with process the items in the thread specify
* Here we specify that the execution of steps in this threads.
*
* 1 doOnNext --> Main
* 2 doOnNext --> pool-2-thread-1
* 3 doOnNext --> pool-3-thread-1
*/
@Test def observerOn(): Unit = {
val executor = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val executor1 = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val scheduler = ExecutionContextScheduler(executor)
val scheduler1 = ExecutionContextScheduler(executor1)
addHeader("observerOn observable")
Observable.just("hello async scala world")
.doOnNext(s =>
println("1 Step-Thread in pipeline:" + Thread.currentThread().getName))
.observeOn(scheduler)
.doOnNext(s =>
println("2 Step-Thread in pipeline:" + Thread.currentThread().getName))
.observeOn(scheduler1)
.doOnNext(s =>
println("2 Step-Thread in pipeline:" + Thread.currentThread().getName))
.subscribe(n => println(n))
}
}