@@ -31,25 +31,26 @@ class RowDataSubscriptionSpec extends Specification {
31
31
sequential
32
32
33
33
" Positive flow" >> {
34
+ val delegate = new TestRowDataSubscriptionDelegate ()
34
35
val subscriber = new TestSubscriber ()
35
- val subscription = newSubscription(subscriber)
36
+ val subscription = newSubscription(subscriber, delegate, bufferSize = 10 )
36
37
" Subscription should call onSubscribe" in {
37
38
subscriber.subscribed must beTrue
38
39
}
39
- " nextRow should be preserved " in {
40
- subscription.nextRow( 0 )
40
+ " request should not affect on rows " in {
41
+ subscription.request( 1 )
41
42
subscriber.lastRow must_== - 1
42
- subscription.rows must haveSize(1 )
43
43
}
44
- " and send when it is requested by calling onNext " in {
45
- subscription.request( 1 )
44
+ " nextRow should be sent " in {
45
+ subscription.nextRow( 0 )
46
46
subscriber.lastRow must_== 0
47
47
}
48
- " next rows should be again preserved" in {
48
+ " next rows should be preserved" in {
49
49
subscription.nextRow(1 )
50
50
subscription.nextRow(2 )
51
51
subscription.nextRow(3 )
52
52
subscriber.lastRow must_== 0
53
+ subscription.rows must haveSize(3 )
53
54
}
54
55
" and send when it is requested but not more than the total number requested" in {
55
56
subscription.request(2 )
@@ -66,16 +67,49 @@ class RowDataSubscriptionSpec extends Specification {
66
67
}
67
68
}
68
69
70
+ " Delegate" >> {
71
+ val delegate = new TestRowDataSubscriptionDelegate ()
72
+ val subscriber = new TestSubscriber ()
73
+ val subscription = newSubscription(subscriber, delegate, bufferSize = 2 )
74
+ " delegate should not be started or paused" >> {
75
+ delegate.started must beFalse
76
+ }
77
+ " request should not cause pause" >> {
78
+ subscription.request(1 )
79
+ delegate.paused must beFalse
80
+ }
81
+ " sending 2 rows should not cause pause" >> {
82
+ subscription.nextRow(0 )
83
+ subscription.nextRow(1 )
84
+ delegate.paused must beFalse
85
+ }
86
+ " sending 3nd row should not cause pause" >> {
87
+ subscription.nextRow(1 )
88
+ delegate.paused must beTrue
89
+ }
90
+ " continue should be called after request" >> {
91
+ subscription.request(1 )
92
+ delegate.paused must beFalse
93
+ }
94
+ " cancel should be called after cancel" >> {
95
+ subscription.cancel()
96
+ delegate.cancelled must beTrue
97
+ }
98
+ }
99
+
69
100
" When it is canceled it should stop sending and preserving rows" >> {
101
+ val delegate = new TestRowDataSubscriptionDelegate ()
70
102
val subscriber = new TestSubscriber ()
71
- val subscription = newSubscription(subscriber)
103
+ val subscription = newSubscription(subscriber, delegate, bufferSize = 10 )
72
104
subscription.nextRow(0 )
73
105
subscription.request(1 )
74
106
subscription.nextRow(1 )
75
107
subscription.cancel()
76
108
subscription.nextRow(2 )
77
109
subscription.rows must haveSize(0 )
78
110
subscriber.lastRow must_== 0
111
+ delegate.started must beTrue
112
+ delegate.cancelled must beTrue
79
113
}
80
114
81
115
val attemptsCount = 1000
@@ -90,16 +124,21 @@ class RowDataSubscriptionSpec extends Specification {
90
124
" Thread safety" ! attempts {_ =>
91
125
val subscriber = new TestSubscriber ()
92
126
implicit val context = ExecutionContext .global
93
- val subscription = new RowDataSubscription (subscriber)
94
127
val count = 1000
95
- context.execute(new Runnable {
96
- override def run (): Unit = {
97
- for (row <- 0 until count) {
98
- subscription.nextRow(row)
99
- }
100
- subscription.complete()
128
+ val delegate = new TestRowDataSubscriptionDelegate () {
129
+ override def start (subscription : RowDataSubscription ): Unit = {
130
+ super .start(subscription)
131
+ context.execute(new Runnable {
132
+ override def run (): Unit = {
133
+ for (row <- 0 until count) {
134
+ subscription.nextRow(row)
135
+ }
136
+ subscription.complete()
137
+ }
138
+ })
101
139
}
102
- })
140
+ }
141
+ val subscription = new RowDataSubscription (subscriber, delegate, bufferSize = 10 )
103
142
context.execute(new Runnable {
104
143
override def run (): Unit = {
105
144
for (row <- 0 until count) {
@@ -117,39 +156,54 @@ class RowDataSubscriptionSpec extends Specification {
117
156
subscriber.lastRowError must beFalse
118
157
subscriber.completed must beTrue
119
158
subscriber.lastRow mustEqual count - 1
159
+ delegate.started must beTrue
160
+ delegate.cancelled must beFalse
120
161
}
121
162
122
163
" Thread safety for cancel" ! attempts {_ =>
123
164
val subscriber = new TestSubscriber ()
124
165
implicit val context = ExecutionContext .global
125
- val subscription = new RowDataSubscription (subscriber)
126
- val count = 1000
127
- context.execute(new Runnable {
128
- override def run (): Unit = {
129
- for (row <- 0 until count + 10 ) {
130
- subscription.nextRow(row)
131
- }
132
- subscription.complete()
133
- }
134
- })
166
+ val count = 10
135
167
val canceledPromise = Promise [Unit ]()
168
+
169
+ val delegate = new TestRowDataSubscriptionDelegate () {
170
+ override def start (subscription : RowDataSubscription ): Unit = {
171
+ super .start(subscription)
172
+ context.execute(new Runnable {
173
+ override def run (): Unit = {
174
+ for (row <- 0 until count + 10 ) {
175
+ subscription.nextRow(row)
176
+ }
177
+ subscription.complete()
178
+ }
179
+ })
180
+ }
181
+
182
+ override def cancel (subscription : RowDataSubscription ): Unit = {
183
+ super .cancel(subscription)
184
+ canceledPromise.success()
185
+ }
186
+ }
187
+ val subscription = new RowDataSubscription (subscriber, delegate, bufferSize = 1 )
136
188
context.execute(new Runnable {
137
189
override def run (): Unit = {
138
190
for (row <- 0 until count) {
139
191
subscription.request(1 )
140
192
}
141
193
subscription.cancel()
142
- canceledPromise.success()
143
194
}
144
195
})
145
196
Await .ready(canceledPromise.future, Duration (10 , TimeUnit .SECONDS ))
146
197
147
198
subscriber.lastRowError must beFalse
148
199
subscriber.completed must beFalse
149
200
subscriber.lastRow must beLessThanOrEqualTo(count - 1 )
201
+ delegate.started must beTrue
202
+ delegate.cancelled must beTrue
150
203
}
151
204
152
- def newSubscription (subscriber : TestSubscriber ) = new RowDataSubscription (subscriber)(SameThreadExecutionContext )
205
+ def newSubscription (subscriber : TestSubscriber , delegate : RowDataSubscriptionDelegate , bufferSize : Int ) =
206
+ new RowDataSubscription (subscriber, delegate, bufferSize)(SameThreadExecutionContext )
153
207
154
208
implicit def intToTestRowData (rowNumber : Int ) : RowData = TestRowData (rowNumber)
155
209
case class TestRowData (rowNumber : Int ) extends RowData {
@@ -202,4 +256,42 @@ class RowDataSubscriptionSpec extends Specification {
202
256
203
257
}
204
258
}
259
+
260
+ class TestRowDataSubscriptionDelegate extends RowDataSubscriptionDelegate {
261
+ var cancelled = false
262
+ override def cancel (subscription : RowDataSubscription ): Unit = {
263
+ if (! started) {
264
+ throw new IllegalStateException (" Not started" )
265
+ }
266
+ if (cancelled) {
267
+ throw new IllegalStateException (" Already canceled" )
268
+ }
269
+ cancelled = true
270
+ }
271
+ var paused = false
272
+ override def pause (subscription : RowDataSubscription ): Unit = {
273
+ if (paused) {
274
+ throw new IllegalStateException (" Already paused" )
275
+ }
276
+ if (! started) {
277
+ throw new IllegalStateException (" Not started" )
278
+ }
279
+ paused = true
280
+ }
281
+
282
+ override def continue (subscription : RowDataSubscription ): Unit = {
283
+ if (! paused) {
284
+ throw new IllegalStateException (" Not paused" )
285
+ }
286
+ if (! started) {
287
+ throw new IllegalStateException (" Not started" )
288
+ }
289
+ paused = false
290
+ }
291
+
292
+ var started = false
293
+ override def start (subscription : RowDataSubscription ): Unit = {
294
+ started = true
295
+ }
296
+ }
205
297
}
0 commit comments