Опции и методы Topic Reader/Writer
Reader
topic:string | TopicReaderSource | TopicReaderSource[]TopicReaderSource:{ path: string; partitionIds?: bigint[]; maxLag?: number | string | Duration; readFrom?: Date | Timestamp }
consumer:stringcodecMap?:Map<Codec | number, CompressionCodec>— дополнительные кодеки распаковкиmaxBufferBytes?:bigint— лимит внутреннего буфера (по умолчанию ~4 МБ)updateTokenIntervalMs?:number— период обновления токена (по умолчанию 60000)onPartitionSessionStart?:(session, committedOffset, { start, end }) => Promise<void | { readOffset?, commitOffset? }>onPartitionSessionStop?:(session, committedOffset) => Promise<void>onCommittedOffset?:(session, committedOffset) => void
Методы и поведение:
read({ limit?, waitMs?, signal? }):AsyncIterable<TopicMessage[]>- Возвращает последовательность батчей сообщений.
limitограничивает общее число сообщений, извлекаемых за один «пробег» итератора, чтобы контролировать задержку и память.waitMsзадаёт максимальное ожидание поступления данных; по таймауту итератор вернёт пустой батч[], что позволяет неблокирующую интеграцию в event loop.signalпозволяет прервать ожидание/чтение. - Почему так: длительные блокировки чтения мешают кооперативной многозадачности; «пустые» отдачи по таймауту упрощают планирование работы без busy‑wait.
- Возвращает последовательность батчей сообщений.
commit(messages | message):Promise<void>- Подтверждает обработку до соответствующего оффсета в каждой затронутой партиции (идемпотентно). Коммит гарантирует, что последующее чтение начнётся после подтверждённого оффсета. Можно вызывать на массиве сообщений (одного батча) или одном сообщении.
- Зачем: это реализация как минимум один раз (at‑least‑once). Коммит отделяет «прочитано» от «обработано» и позволяет безопасное восстановление.
- Перфоманс: ожидание
await commit()в горячем пути снижает пропускную способность. Допустима стратегия fire‑and‑forget (void reader.commit(batch)) c наблюдением черезonCommittedOffset.
close():Promise<void>- Завершает чтение «мягко»: перестаёт принимать новые данные, дожидается завершения ожидающих коммитов (с защитным таймаутом) и корректно останавливает фоновые задачи.
destroy(reason?):void- Немедленно останавливает все операции, отклоняет ожидающие коммиты, освобождает ресурсы.
Writer
topic:stringtx?:TX— запись внутри транзакцииproducer?:stringcodec?:CompressionCodecmaxBufferBytes?:bigint— по умолчанию 256 МБmaxInflightCount?:number— по умолчанию 1000flushIntervalMs?:number— по умолчанию 10 мсupdateTokenIntervalMs?:number— по умолчанию 60000retryConfig?(signal):RetryConfigonAck?(seqNo, status?):(seqNo: bigint, status?: 'skipped' | 'written' | 'writtenInTx') => void
Методы и поведение:
write(payload: Uint8Array, extra?):bigint- Кладёт сообщение в буфер и возвращает назначенный
seqNo. Опционально можно задатьseqNo,createdAt,metadataItems. Запись не блокирует; фактическая отправка выполняется приflush()или периодическим флашером. - Почему
seqNo: на продюсереproducerId + seqNoобеспечивает идемпотентность и детерминизм подтверждений (и упорядоченность в партиции).
- Кладёт сообщение в буфер и возвращает назначенный
flush():Promise<bigint | undefined>- Выгружает накопленные сообщения в сеть, дожидается подтверждений «в полёте» и возвращает последний
seqNo. Используйте в контрольных точках (например, при остановке сервиса).
- Выгружает накопленные сообщения в сеть, дожидается подтверждений «в полёте» и возвращает последний
close():Promise<void>- «Мягко» завершает работу: прекращает приём новых сообщений, дожидается флаша, освобождает ресурсы.
destroy():void- Немедленное прекращение без гарантии доставки.
Подтверждения:
onAck(seqNo, status): уведомляет о судьбе сообщения.status:written— записано вне транзакции;writtenInTx— записано в транзакции (станет видимым после коммита);skipped— пропущено (например, из‑за конфликтаseqNo).
Повторные попытки и устойчивость:
- Подключение к TopicService — потоковое; при обрывах переподнимается с бюджетом/стратегией из
retryConfig. Очередь команд пересоздаётся.
Транзакционные варианты:
createTopicTxReader(tx, ...)иcreateTopicTxWriter(tx, ...)привязаны к транзакции Query.- TxReader отслеживает прочитанные оффсеты и отправляет
updateOffsetsInTransactionнаtx.onCommit. - TxWriter инициирует
flushнаtx.onCommitи корректно сворачивается наtx.onRollback/onClose. - Эти объекты не реализуют
AsyncDisposable; использоватьusingдля них не нужно: жизненным циклом управляет транзакция.
- TxReader отслеживает прочитанные оффсеты и отправляет