Skip to content

Commit 05cd33b

Browse files
committed
stream test
1 parent d6e70b9 commit 05cd33b

File tree

2 files changed

+58
-2
lines changed

2 files changed

+58
-2
lines changed

postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/DatabaseTestHelper.scala

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ package com.github.mauricio.async.db.postgresql
1919
import java.util.concurrent.{TimeUnit, TimeoutException}
2020

2121
import com.github.mauricio.async.db.util.Log
22-
import com.github.mauricio.async.db.{Configuration, Connection}
22+
import com.github.mauricio.async.db.{RowData, Configuration, Connection}
23+
import org.reactivestreams.{Subscription, Subscriber}
2324

2425
import scala.concurrent.duration._
25-
import scala.concurrent.{Await, Future}
26+
import scala.concurrent.{Promise, Await, Future}
2627

2728
object DatabaseTestHelper {
2829
val log = Log.get[DatabaseTestHelper]
@@ -105,9 +106,44 @@ trait DatabaseTestHelper {
105106
} )
106107
}
107108

109+
def executeStream(handler: PostgreSQLConnection, statement: String, windowSize : Int = 1000, values: Array[Any] = Array.empty[Any]) : IndexedSeq[RowData] = {
110+
handleTimeout(handler, {
111+
val subscriber: TestSubscriber = new TestSubscriber
112+
handler.streamQuery(statement, windowSize).subscribe(subscriber)
113+
Await.result(subscriber.promise.future, Duration(5, SECONDS))
114+
})
115+
}
116+
108117
def await[T](future: Future[T]): T = {
109118
Await.result(future, Duration(10, TimeUnit.SECONDS))
110119
}
111120

121+
class TestSubscriber extends Subscriber[RowData] {
122+
val promise = Promise[IndexedSeq[RowData]]()
123+
override def onError(t: Throwable): Unit = {
124+
promise.failure(t)
125+
}
126+
127+
var subscription: Option[Subscription] = None
128+
var requested : Long = 0
129+
override def onSubscribe(subscription: Subscription): Unit = {
130+
this.subscription = Some(subscription)
131+
requested = 10
132+
subscription.request(10)
133+
}
134+
135+
override def onComplete(): Unit = {
136+
promise.success(rows)
137+
requested -= 1
138+
if (requested <= 2) {
139+
subscription.get.request(8)
140+
requested += 8
141+
}
142+
}
112143

144+
var rows = IndexedSeq[RowData]()
145+
override def onNext(t: RowData): Unit = {
146+
rows = rows :+ t
147+
}
148+
}
113149
}

postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnectionSpec.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,26 @@ class PostgreSQLConnectionSpec extends Specification with DatabaseTestHelper {
196196

197197
}
198198

199+
"stream a statement" in {
200+
withHandler {
201+
handler =>
202+
executeDdl(handler, this.preparedStatementCreate)
203+
executeDdl(handler, this.preparedStatementInsert, 1)
204+
executeDdl(handler, this.preparedStatementInsert2, 1)
205+
val result = executeStream(handler, this.preparedStatementSelect)
206+
207+
result.size must_== 2
208+
209+
val row = result.head
210+
row(0) === 1
211+
row(1) === "John Doe"
212+
213+
val row2 = result(1)
214+
row2(0) === 2
215+
row2(1) === "Mary Jane"
216+
}
217+
}
218+
199219
"execute a prepared statement with parameters" in {
200220

201221
withHandler {

0 commit comments

Comments
 (0)