Tutorials/Streaming

Streaming large result sets

A nightly job touches five million rows. Loading them into a list is a heap dump waiting to happen. Storm streams rows through a Flow with constant memory, and batches the write direction.

Series · The Storm way4 min readKotlin

01The task

Export, migrate, or reprocess a table that does not fit in memory, with database-side filtering, consistent transactional semantics where needed, and efficient writes on the way back.

02Read as a Flow

Every query builder exposes resultFlow. Rows hydrate as they arrive from the database, and structured concurrency handles the cleanup: when the Flow completes or the coroutine is cancelled, cursors and connections are released without explicit close calls:

ExportJob.kt Kotlin · Storm
1
2
3
4
5
6
7
8
9
// A Flow streams rows as they arrive; memory stays constant
val users: Flow<User> = orm.entity<User>().select().resultFlow

users.collect { user ->
    processUser(user)   // one row in memory at a time
}

// Flow operators compose before anything loads
val emails: List<String> = users.map { it.email }.toList()

Two details make this safe at scale. Storm's per-query interner only retains entities while your code holds them, so processed rows are collected normally and do not accumulate. And because the Flow is lazy, operators like map and filter compose before any row loads.

03Filter in the database

Streaming composes with the query builder, so selectivity happens where it belongs:

OrderJob.kt Kotlin · Storm
1
2
3
4
5
// Push the filtering to the database, stream what remains
val recentOrders: Flow<Order> = orm.entity<Order>()
    .select()
    .where(Order_.status eq Status.PENDING)
    .resultFlow

04Stream inside a transaction

When the job reads and writes as one atomic operation, wrap the stream in a transaction; the Flow and the updates share the same connection and commit or roll back together:

ReprocessJob.kt Kotlin · Storm
1
2
3
4
5
6
// Read and write consistently inside one transaction
transaction {
    orm.select<Order>().resultFlow.collect { order ->
        orm update order.copy(processed = true)
    }
}

05The write direction

Bulk writes take lists or Flows and execute through JDBC batching, with a configurable batch size for streaming sources:

ImportJob.kt Kotlin · Storm
1
2
3
4
5
// The write direction: lists and Flows batch through JDBC batching
orm insert users              // batched insert from a list

val incoming: Flow<User> = readFromKafka()
orm.entity<User>().update(incoming, batchSize = 500)   // stream in, batch out

06Keep going

The reference documentation covers the mechanics in depth: