Topic (packages/topic)
Клиенты для YDB Topics: потоковое чтение/запись, оффсеты, кодеки, транзакции.
Быстрый старт
ts
import { Driver } from '@ydbjs/core'
import { topic } from '@ydbjs/topic'
const driver = new Driver(process.env['YDB_CONNECTION_STRING']!)
await driver.ready()
const t = topic(driver)
await using reader = t.createReader({ topic: '/Root/my-topic', consumer: 'c1' })
for await (const batch of reader.read()) {
await reader.commit(batch)
}
Writer
ts
await using writer = t.createWriter({ topic: '/Root/my-topic', producer: 'p1' })
writer.write(new TextEncoder().encode('Hello'))
await writer.flush()
Примеры
Reader с батчами и таймаутами
ts
await using reader = t.createReader({ topic: '/Root/my-topic', consumer: 'svc-a' })
for await (const batch of reader.read({ limit: 100, waitMs: 1000 })) {
if (!batch.length) continue // периодический тик без блокировки цикла
// обработка батча
void reader.commit(batch) // fire-and-forget совместно с onCommittedOffset
}
Кастомные кодеки (gzip)
ts
import { Codec } from '@ydbjs/api/topic'
import * as zlib from 'node:zlib'
const MyGzip = {
codec: Codec.GZIP,
compress: (p: Uint8Array) => zlib.gzipSync(p),
decompress: (p: Uint8Array) => zlib.gunzipSync(p),
}
await using reader = t.createReader({
topic: '/Root/custom',
consumer: 'svc-a',
codecMap: new Map([[Codec.GZIP, MyGzip]]),
})
Транзакционный reader/writer
ts
import { query } from '@ydbjs/query'
import { createTopicTxReader, createTopicTxWriter } from '@ydbjs/topic'
const sql = query(driver)
await sql.begin(async (tx, signal) => {
const reader = createTopicTxReader(tx, driver, { topic: '/Root/my-topic', consumer: 'svc-a' })
for await (const batch of reader.read({ signal })) {
// внутри транзакции
}
const writer = createTopicTxWriter(tx, driver, { topic: '/Root/my-topic', producer: 'p1' })
writer.write(new TextEncoder().encode('in-tx'))
})
Подтверждения writer и seqNo
ts
await using writer = t.createWriter({ topic: '/Root/my-topic', producer: 'p1' })
writer.onAck = (seqNo, status) => console.log('ack', seqNo, status)
writer.write(new TextEncoder().encode('hello'))
await writer.flush()
Лимиты размера и inflight
ts
// Внутренне одно сообщение > 48MiB будет отклонено клиентом Topic
// Разбивайте нагрузку или используйте сжатие через кодеки
Несколько источников и фильтры партиций
ts
await using reader = t.createReader({
topic: [{ path: '/Root/topic-a', partitionIds: [0n, 1n] }, { path: '/Root/topic-b' }],
consumer: 'svc-a',
})
for await (const batch of reader.read({ waitMs: 500 })) {
// обработка сообщений с обеих тем, с фильтрацией партиций
}
Хуки сессий партиций
ts
await using reader = t.createReader({
topic: '/Root/metrics',
consumer: 'svc-a',
onPartitionSessionStart: async (session, committed, { start, end }) => {
// сдвигаем readOffset, чтобы продолжить с последнего закоммиченного
return { readOffset: committed }
},
onPartitionSessionStop: async (session, committed) => {
console.log('partition closed', session.partitionSessionId, 'committed', committed)
},
onCommittedOffset: (session, committed) => {
// наблюдаем коммиты (удобно при fire-and-forget commit())
console.log('committed', session.partitionSessionId, committed)
},
})
Временные селекторы: readFrom и maxLag
ts
await using reader = t.createReader({
topic: {
path: '/Root/events',
readFrom: new Date(Date.now() - 60_000), // последние 1 минута
maxLag: '30s', // или число миллисекунд
},
consumer: 'svc-a',
})
for await (const batch of reader.read({ waitMs: 500 })) {
// обрабатываем только свежие события
}
Корректное завершение
ts
await reader.close() // дождётся pending-коммитов с защитным таймаутом
await writer.close() // выполнит flush и дождётся подтверждений