Опции и методы Topic Reader/Writer
Reader
topic
:string | TopicReaderSource | TopicReaderSource[]
TopicReaderSource
:{ path: string; partitionIds?: bigint[]; maxLag?: number | string | Duration; readFrom?: Date | Timestamp }
consumer
:string
codecMap?
:Map<Codec | number, CompressionCodec>
— дополнительные кодеки распаковкиmaxBufferBytes?
:bigint
— лимит внутреннего буфера (по умолчанию ~4 МБ)updateTokenIntervalMs?
:number
— период обновления токена (по умолчанию 60000)onPartitionSessionStart?
:(session, committedOffset, { start, end }) => Promise<void | { readOffset?, commitOffset? }>
onPartitionSessionStop?
:(session, committedOffset) => Promise<void>
onCommittedOffset?
:(session, committedOffset) => void
Методы и поведение:
read({ limit?, waitMs?, signal? })
:AsyncIterable<TopicMessage[]>
- Возвращает последовательность батчей сообщений.
limit
ограничивает общее число сообщений, извлекаемых за один «пробег» итератора, чтобы контролировать задержку и память.waitMs
задаёт максимальное ожидание поступления данных; по таймауту итератор вернёт пустой батч[]
, что позволяет неблокирующую интеграцию в event loop.signal
позволяет прервать ожидание/чтение. - Почему так: длительные блокировки чтения мешают кооперативной многозадачности; «пустые» отдачи по таймауту упрощают планирование работы без busy‑wait.
- Возвращает последовательность батчей сообщений.
commit(messages | message)
:Promise<void>
- Подтверждает обработку до соответствующего оффсета в каждой затронутой партиции (идемпотентно). Коммит гарантирует, что последующее чтение начнётся после подтверждённого оффсета. Можно вызывать на массиве сообщений (одного батча) или одном сообщении.
- Зачем: это реализация как минимум один раз (at‑least‑once). Коммит отделяет «прочитано» от «обработано» и позволяет безопасное восстановление.
- Перфоманс: ожидание
await commit()
в горячем пути снижает пропускную способность. Допустима стратегия fire‑and‑forget (void reader.commit(batch)
) c наблюдением черезonCommittedOffset
.
close()
:Promise<void>
- Завершает чтение «мягко»: перестаёт принимать новые данные, дожидается завершения ожидающих коммитов (с защитным таймаутом) и корректно останавливает фоновые задачи.
destroy(reason?)
:void
- Немедленно останавливает все операции, отклоняет ожидающие коммиты, освобождает ресурсы.
Writer
topic
:string
tx?
:TX
— запись внутри транзакцииproducer?
:string
codec?
:CompressionCodec
maxBufferBytes?
:bigint
— по умолчанию 256 МБmaxInflightCount?
:number
— по умолчанию 1000flushIntervalMs?
:number
— по умолчанию 10 мсupdateTokenIntervalMs?
:number
— по умолчанию 60000retryConfig?(signal)
:RetryConfig
onAck?(seqNo, status?)
:(seqNo: bigint, status?: 'skipped' | 'written' | 'writtenInTx') => void
Методы и поведение:
write(payload: Uint8Array, extra?)
:bigint
- Кладёт сообщение в буфер и возвращает назначенный
seqNo
. Опционально можно задатьseqNo
,createdAt
,metadataItems
. Запись не блокирует; фактическая отправка выполняется приflush()
или периодическим флашером. - Почему
seqNo
: на продюсереproducerId + seqNo
обеспечивает идемпотентность и детерминизм подтверждений (и упорядоченность в партиции).
- Кладёт сообщение в буфер и возвращает назначенный
flush()
:Promise<bigint | undefined>
- Выгружает накопленные сообщения в сеть, дожидается подтверждений «в полёте» и возвращает последний
seqNo
. Используйте в контрольных точках (например, при остановке сервиса).
- Выгружает накопленные сообщения в сеть, дожидается подтверждений «в полёте» и возвращает последний
close()
:Promise<void>
- «Мягко» завершает работу: прекращает приём новых сообщений, дожидается флаша, освобождает ресурсы.
destroy()
:void
- Немедленное прекращение без гарантии доставки.
Подтверждения:
onAck(seqNo, status)
: уведомляет о судьбе сообщения.status
:written
— записано вне транзакции;writtenInTx
— записано в транзакции (станет видимым после коммита);skipped
— пропущено (например, из‑за конфликтаseqNo
).
Повторные попытки и устойчивость:
- Подключение к TopicService — потоковое; при обрывах переподнимается с бюджетом/стратегией из
retryConfig
. Очередь команд пересоздаётся.
Транзакционные варианты:
createTopicTxReader(tx, ...)
иcreateTopicTxWriter(tx, ...)
привязаны к транзакции Query.- TxReader отслеживает прочитанные оффсеты и отправляет
updateOffsetsInTransaction
наtx.onCommit
. - TxWriter инициирует
flush
наtx.onCommit
и корректно сворачивается наtx.onRollback/onClose
. - Эти объекты не реализуют
AsyncDisposable
; использоватьusing
для них не нужно: жизненным циклом управляет транзакция.
- TxReader отслеживает прочитанные оффсеты и отправляет