Skip to main content

Kafka 4.2.0 KIP-1034:內建 DLQ,終結手動錯誤處理

· 16 min read

本篇搭配的範例程式已一併放進這個 repo:examples/kafka/kip-1034-dlq-blog-post/

before/ 對應 Kafka 4.2.0 以前常見的手動 DLQ 作法,after/ 對應 Kafka 4.2.0 / KIP-1034 的內建 DLQ 作法。

Kafka Streams 應用在處理資料流時,經常需要面對不合法或無法反序列化的 record。遇到這類資料時,系統可以選擇 fail,讓應用停止;也可以選擇 continue,略過該筆資料。實務上通常還有第三種需求:保留錯誤資料,寫入另一個 topic,供後續修補、重送、追查或告警使用。這正是 DLQ(Dead Letter Queue)的用途。

問題在於,Kafka Streams 4.2.0 以前並未提供完整的內建 DLQ 寫入路徑。應用程式可以自行補上這項能力,但必須同時處理 producer lifecycle、error metadata、錯誤發生位置,以及 transaction 邊界;若啟用 exactly_once_v2,這些限制會更加明顯。

KIP-1034 補上的正是這段缺口。Kafka Streams 的內建 exception handlers 現在可以把 DLQ record 回交給框架,由框架透過既有寫入流程送出,使 DLQ 寫入得以回到 Kafka Streams 的 transactional write flow。以下以 repo 內的範例對照舊作法與新作法。

先界定問題

範例使用一個 click-events topic,內容為 JSON 字串:

{"ad_id":"banner-A","count":3}

Streams topology 會把它讀進來、deserialize 成 ClickEvent,接著做一點簡單轉換,最後寫到 click-events-output

整體流程可以想成:

click-events -> deserialize -> process -> click-events-output

若資料格式正確,處理流程相當單純。但只要出現 NOT_VALID_JSON 這類資料,Kafka Streams 在進入 processor 前就可能失敗,因為 record 必須先由 bytes 轉為應用程式期待的物件;一旦反序列化失敗,後續 processor 尚未開始執行。

因此,在 Kafka Streams 中討論 DLQ,不能只理解為「catch exception 後寫入另一個 topic」。錯誤可能發生在 topology 之內,也可能發生在 topology 之前。

case A: processing error

click-events -> deserialize -> process X -> output
|
+-> still inside topology


case B: deserialization error

click-events -> deserialize X -> process -> output
|
+-> processor 還沒開始

Kafka 4.2.0 以前:兩種 exception,兩套處理方式

先看 Kafka 4.2.0 以前常見的手動 DLQ 作法。主要負擔在於,應用程式不是只需處理單一錯誤型態,而是必須分別處理兩種不同型態的 exception。repo 裡的 before/ 將這兩種情況整理成可重現的範例,方便對照各自限制。

Processing error:發生在 topology 裡

若錯誤出現在 topology 內部,應用程式仍有機會在 DSL 轉換或 processor 中處理。這裡示範的做法,是先讓資料正常完成 deserialization,接著在 flatMap 這類 DSL 轉換中執行 business rule,並於必要時自行 try/catch

為什麼這裡用 flatMap

這個 before 範例要表達的是手動 DLQ 的處理方式:資料先正常完成 deserialization,成功資料繼續往下送;若後續 business rule 失敗,資料改送 DLQ,主流程不再產生 output。對這種「成功 1 筆、失敗 0 筆」的 flow 而言,flatMapmapValues 更自然。

也可對照後續 after 範例。KIP-1034 之後,DLQ 交回 Kafka Streams 內部處理,主流程只剩正常資料轉換,因此 ClickEventTopology.java 可直接使用 mapValues。若在這個 before 範例中直接拋出 exception,當然也可執行;但那就不是此處要示範的手動攔截與分流路徑。

這段是 ClickEventManualDlqTopology.java 的核心:

stream
.flatMap((key, event) -> {
try {
if (event.count < 0) {
throw new IllegalArgumentException("count must be non-negative");
}
String processed = "user=" + key + " clicked ad=" + event.adId + " count=" + event.count;
return Collections.singletonList(KeyValue.pair(key, processed));
} catch (Exception e) {
sendToDlq(key, event, e);
return Collections.emptyList();
}
})
.to(outputTopic);

這段 code 的重點不在 flatMap,而在 catch 裡的 sendToDlq()。這裡的錯誤已經不是 JSON parse 失敗,而是 record 成功進入 topology 後,因 business rule 不合法而被手動導向 DLQ。舊版最常見的做法,是準備一個獨立的 KafkaProducer,遇到這類錯誤就直接寫入 DLQ。

這個方式能夠運作,也可以一併補上 error metadata:

ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(dlqTopic, key, value);
dlqRecord.headers().add("error.message", cause.getMessage() != null
? cause.getMessage().getBytes() : "null".getBytes());
dlqRecord.headers().add("error.class", cause.getClass().getName().getBytes());
dlqProducer.send(dlqRecord).get();

限制在於,這個 dlqProducer 與 Kafka Streams 內部 producer 並非同一個 instance。應用程式不只要另行維護 producer,也無法把這條 DLQ 寫入納入 Kafka Streams 的同一個 transaction,因此 DLQ 寫入與主流程無法共同達成 EOS。

Deserialization error:發生在 topology 之前

deserialization error 的限制更明確。

這類錯誤不是發生在 mapflatMaptransform 等 topology 步驟內,而是在 record 被 consumer 取回後、真正進入 topology 前就已發生。換言之,processor 尚未接手該筆資料,deserialization 已經失敗。

在這種情況下,topology 內的分流手段都無法介入。flatMapsplit()branch() 均無法觸及該筆資料;可用的處理入口只剩 DeserializationExceptionHandler

Kafka Streams 原始碼中的對應位置

RecordQueue.addRawRecords() 先把 raw records 放進 queue,接著 updateHead() 會呼叫 recordDeserializer.deserialize(processorContext, raw);之後 StreamTask.process() 才從 partitionGroup.nextRecord(...) 取出 record,交給 doProcess() 傳進 source node。也就是說,deserialization 確實發生在 record 進入 topology 之前。

參考:

ManualDlqHandler.java 示範的就是這條路:

@Override
public DeserializationHandlerResponse handle(
ErrorHandlerContext context,
ConsumerRecord<byte[], byte[]> record,
Exception exception) {

sendToDlq(record, exception);
return DeserializationHandlerResponse.CONTINUE;
}

此處的限制相當直接:deserialization error 發生時,record 尚未進入 topology,因此不能用 topology 內的 routing 手段處理。context.forward() 無法使用,branch()split()flatMap 等 DSL 轉換也無法觸及該筆資料。若要寫入 DLQ,舊版常見作法仍是自行建立獨立的 KafkaProducer

除了自行送出之外,應用程式也必須自行複製原始 headers,並補上 topic / partition / offset / exception 等 metadata:

record.headers().forEach(h -> dlqRecord.headers().add(h));
dlqRecord.headers().add("__manual.error.topic", record.topic().getBytes());
dlqRecord.headers().add("__manual.error.partition",
String.valueOf(record.partition()).getBytes());
dlqRecord.headers().add("__manual.error.offset",
String.valueOf(record.offset()).getBytes());

這正是舊版作法的主要負擔:不同錯誤類型必須掛在不同處理位置,處理方式也不一致。

綜合 before/ 的兩條路徑,application 層必須自行承擔下列責任:

  • 判斷錯誤應於哪一層攔截;processing error 與 deserialization error 並非同一套寫法。
  • 另行維護 KafkaProducer,包含 lifecycle、配置與送出失敗時的處理策略。
  • 自行定義 headers 命名與需要攜帶的 metadata。
  • 在 EOS 開啟時承擔 DLQ 可能於 retry 過程中重複寫入的風險。
  • 測試往往必須圍繞 workaround 撰寫,而不是直接驗證框架行為。

舊版的問題不只是程式碼較多,而是錯誤處理、資料一致性與 observability 的責任都被推回 application 層。

真正的痛點在 tx-safe

即使接受「只能自行寫入 DLQ」這個前提,transaction 邊界仍然沒有解決。

Kafka Streams 在 EOS 模式下會自行管理 transaction。簡化後的處理流程如下:

BEGIN TX
-> consume record
-> deserialize
-> process
-> send output via RecordCollector
-> sendOffsetsToTransaction
COMMIT TX

若在此流程中另以獨立 KafkaProducer 寫入 DLQ,問題相當直接:dlqProducer 與 Kafka Streams 內部用於送出 output 的 producer 不是同一個 producer instance。既然不是同一個 producer,就無法共享同一個 Kafka transaction。

用圖看會更清楚:

Kafka Streams internal producer
-> output records
-> transaction A

manual dlqProducer
-> DLQ records
-> not part of transaction A

也就是說,手動送出的 DLQ record 不會落在 Kafka Streams 那條 transactional write path 裡。

可能發生的情境如下:

  1. 手動 DLQ producer 已經將錯誤資料送出。
  2. Kafka Streams 內部 transaction 隨後因 rebalance、crash 或其他原因 abort。
  3. Kafka Streams retry 後重新處理同一筆資料。
  4. DLQ record 再次被寫入。
caution

獨立 producer 送出的 record 不在 Kafka Streams 的 transaction 之內;abort 或 retry 不會使該筆 DLQ 寫入隨之 rollback,因此 DLQ 可能被重複寫入。這是舊版作法的根本限制:框架沒有提供正式且可納入 transaction 的 DLQ 寫入路徑。

Kafka 4.2.0 / KIP-1034:框架終於把這條路補起來

到了 Kafka 4.2.0,KIP-1034 將這件事正式納入 Kafka Streams 的 error handling flow。

核心方向是:exception handler 可以把要寫入 DLQ 的 records 回交給框架,由 Kafka Streams 透過內部 producer 送出。這項能力加在 Kafka Streams 內建的 deserialization / processing / production exception handling 流程上;本文的 after/ 範例同時示範 deserialization error 與 processing error。

這個改變的關鍵在於,只要 DLQ record 由框架送出,就能沿用 Kafka Streams 既有的 producer 與 transaction,而不必在 application 層另行建立獨立 producer。

KIP-1034 之後,topology 本身可以維持單純:

builder
.stream(inputTopic, Consumed.with(Serdes.String(), new ClickEventSerde()))
.mapValues(event -> {
if (event.count < 0) {
throw new IllegalArgumentException("count must be non-negative");
}
return "user clicked ad=" + event.adId + " count=" + event.count;
})
.to(outputTopic);

這段 ClickEventTopology.java 仍不包含任何 DLQ 相關 code;沒有手動 try/catch、沒有另行建立 producer,也沒有自行補 headers。count < 0 只是 business validation;該 exception 如何寫入 DLQ,仍由 Kafka Streams 的 processing exception handler 負責。

真正啟用 DLQ 的設定位於 App.java。DLQ topic 由下列 config 指定:

props.put(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, DLQ_TOPIC);

再搭配內建 deserialization / processing handlers:

props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueProcessingExceptionHandler.class);

這裡也是 KIP-1034 最核心的 API 變化。舊版 exception handler 的回傳值本質上只是在回答「繼續」或「失敗」;4.2.0 之後,handler 的新 Response 可以額外攜帶「需要由框架送出的 DLQ records」。也因為 handler 現在可以把 DLQ records 回交給 Kafka Streams,框架才得以透過內部 StreamsProducer / RecordCollector 送出,而不是把 DLQ 寫入責任留在 application 層。

本文範例中的 malformed JSON 會觸發 deserialization error,因此 LogAndContinueExceptionHandler 這條路徑的效果如下:

  1. ClickEventSerde 反序列化失敗時,會拋 exception。
  2. LogAndContinueExceptionHandler 會接手。
  3. 4.2.0 的 handler 可以建立 DLQ record,並交還給 Kafka Streams。
  4. Kafka Streams 透過 RecordCollectorImpl 用同一個 StreamsProducer 把 record 送出去。
  5. 因為走的是同一個 producer,DLQ 寫入也落在同一個 transaction 邊界內。

本文範例中的 count < 0 則會觸發 topology 內部的 processing error,由 LogAndContinueProcessingExceptionHandler 接手。Kafka Streams 4.2.0 的 processing.exception.handler 預設值是 LogAndFailProcessingExceptionHandler;即使設定了 errors.dead.letter.queue.topic.name,預設 handler 仍會回傳 fail。因此,若 processing error 也要「寫入 DLQ 後繼續」,必須明確設定 LogAndContinueProcessingExceptionHandler

補充一點:ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG 之所以有效,是因為內建 exception handlers 會讀取該 config,並透過 Kafka Streams 內部工具建立 DLQ record;它不是框架對所有 handler 強制套用的行為。多數情境下,內建 handler 已足以涵蓋需求;若需要自訂 DLQ record 內容,仍可改用 custom handler。

但 custom handler 不必回到舊版的手動 producer 寫法。KIP-1034 之後,exception handler 介面本身已改變:舊版 handle() 只能回傳 CONTINUE 或 FAIL;新版 handleError() 回傳 Response,其中可以攜帶 ProducerRecord 列表,由 Kafka Streams 透過同一個內部 producer 送出。

// Kafka 4.2.0 以前:若要寫入 DLQ,通常只能自行建立 producer
@Override
public DeserializationHandlerResponse handle(
ErrorHandlerContext context,
ConsumerRecord<byte[], byte[]> record,
Exception exception) {
ProducerRecord<byte[], byte[]> dlqRecord =
new ProducerRecord<>("app-dlq", record.key(), record.value());
dlqProducer.send(dlqRecord).get(); // 獨立 producer,不在 Streams tx 裡
return DeserializationHandlerResponse.CONTINUE;
}

// Kafka 4.2.0:把 DLQ record 回交給框架,走同一條 transaction 路徑
@Override
public DeserializationExceptionHandler.Response handleError(
ErrorHandlerContext context,
ConsumerRecord<byte[], byte[]> record,
Exception exception) {
ProducerRecord<byte[], byte[]> dlqRecord =
new ProducerRecord<>("app-dlq", record.key(), record.value());
return DeserializationExceptionHandler.Response.resume(List.of(dlqRecord));
}

介面差異就在於:handleError() 允許 handler 把 DLQ records 回交給框架送出,不需要在 application 層另行建立 producer。

這是 KIP-1034 最重要的差別。它不只是省去自行建立 producer 的負擔,也讓 DLQ 寫入重新納入 Kafka Streams 的一致性模型。

換言之,本文所說的「透過 config 啟用 DLQ」成立的前提,是使用內建 handler;若改用 custom deserialization / processing / production handler,errors.dead.letter.queue.topic.name 不會自動替該 handler 建立 DLQ record,handler 必須自行決定是否建立 records。不過,custom handler 仍可透過 Response.resume(...) 把 records 回交給 Kafka Streams,因此依然可以走內建寫入路徑,而不需要自行建立 producer。

若拆開 after 帶來的差異,可以整理為下列幾點:

  • 不必另行建立 DLQ producer;Kafka Streams 內部會負責送出。
  • 不必把 DLQ 邏輯放入 topology;主流程可以維持單純。
  • 不必自行補齊常見 error headers;exception / topic / partition / offset 等 metadata 由框架建立。
  • deserialization error 不再需要 application 層 workaround;框架已提供正式處理路徑。
  • processing error 也可以透過內建 handler 回交 DLQ records,不必在 topology 中手動建立 producer。
  • KIP-1034 的能力也延伸到 production exception handler;ProductionExceptionHandler 處理的是 Kafka Streams 送出到下游時的寫入錯誤。4.2.0 以前,handle() 只能回傳 CONTINUE 或 FAIL,沒有 DLQ records;4.2.0 之後,handleError() 回傳 Response,可以攜帶 DLQ records,也具備 RETRY 選項。
  • DLQ 與 exactly_once_v2 可以放在同一個 transaction 模型中理解。

整體而言,設定更集中,topology 更單純,error metadata 由框架補齊,測試也能直接驗證框架行為。更重要的是,DLQ 寫入回到 transaction 邊界內,能與 EOS 模型一致。

note

KIP-1034 只定義了「如何送出 DLQ record」以及「預設應攜帶哪些 headers」,但 DLQ topic 本身不會由 Kafka Streams 自動建立。

如果 broker 開啟 auto.create.topics.enable=true,topic 可以由 broker 的 auto-create 機制建立。但 production 環境通常不應依賴此行為:許多 cluster 會直接關閉 auto-create;即使開啟,topic 也會套用 broker 預設的 partitions、replication factor、retention、cleanup policy,未必符合 input / output / DLQ topic 的需求。

因此,repo 裡的 after/src/main/java/io/example/App.java 會先建立 click-events-dlq topic,而不是依賴 broker auto-create。

after 的資料流則比較像這樣:

                +-------------------+
| click-events |
+-------------------+
|
v
+-------------------+
| deserialize |
+-------------------+
| |
success| |error
v v
+-------------+ +------------------------------+
| process | | LogAndContinueExceptionHandler|
+-------------+ +------------------------------+
| |
v v
+-------------------+ +-------------------+
| click-events-out | | click-events-dlq |
+-------------------+ +-------------------+

both writes go through Kafka Streams internals

如果只看 transaction 邊界,after 的差異會更明顯:

+---------------- Kafka Streams transaction ----------------+
| consume -> deserialize -> process |
| | |
| +-> DLQ record via Kafka Streams |
| +-> output record via Kafka Streams |
| |
| both use the same StreamsProducer / RecordCollector |
+--------------------- commit / abort ---------------------+

不只 tx-safe,連 error headers 也一起內建

另一項實用的改變,是 error metadata 不必再由 application 層自行補齊。

在舊版手動作法中,header 名稱、內容格式,以及是否攜帶原始 topic / partition / offset,都必須由應用程式自行決定。不同團隊可能各自定義一套格式,時間一久也容易分歧。

KIP-1034 之後,框架會自動附上這些 __streams.errors.* headers:

  • __streams.errors.exception
  • __streams.errors.message
  • __streams.errors.stacktrace
  • __streams.errors.topic
  • __streams.errors.partition
  • __streams.errors.offset

這個細節之所以重要,是因為 DLQ 並不只是承接錯誤資料。實務上,後續往往還牽涉到原因追查、告警、資料回補與 replay。若這些 metadata 能由框架穩定補齊,後續處理會更一致,也更容易被測試覆蓋。

總結

項目Kafka 4.2.0 以前的手動作法Kafka 4.2.0 / KIP-1034
Processing error可以自行攔截,但通常必須自行導向 DLQ可改走 4.2.0 新增的 processing exception handler 路徑
Deserialization error只能掛 DeserializationExceptionHandler,且通常必須自行送出可透過內建 DLQ flow 處理
DLQ 寫入方式常見作法是獨立 KafkaProducer,責任在 application 層由 Kafka Streams 內部送出
Tx-safe容易在 transaction 外送出,retry 時可能重複寫入使用同一個 StreamsProducer,可納入同一個 transaction
Error headers必須自行補齊、命名與維護框架附上 __streams.errors.*
程式碼量topology、handler、headers、producer lifecycle 都要自行處理搭配內建 handler 時,主流程通常只需配置即可啟用

Kafka 4.2.0 以前的 DLQ 較像 application 層自行補出的機制;KIP-1034 之後,DLQ 才正式進入 Kafka Streams 框架,並能與 transaction 模型一併運作。

用測試看行為差異,比講概念更準

許多細節放在測試中觀察會更直接,因為 input、output、DLQ record 與 headers 都能在同一處驗證。若要對照本文提到的 routing 與 header 行為,可直接閱讀 before/after/ 的測試。

若要自行執行範例,專案目錄如下:

cd examples/kafka/kip-1034-dlq-blog-post
./gradlew test

何時應評估升級到 4.2.0?

若系統具備下列需求,KIP-1034 的價值會相當明確:

  • deserialization error 需要穩定寫入 DLQ。
  • 不希望在 topology 外另行維護 producer。
  • 已啟用 exactly_once_v2,且不希望承擔 transaction 外寫入導致的重複寫入風險。
  • 需要一致的 error headers,以支援後續告警、查錯與回補。

換言之,4.2.0 之後,DLQ 不再只是 application 層自行補上的機制,而是 Kafka Streams 框架正式承接的責任。

結語

KIP-1034 補的是 Kafka Streams 長期存在的 DLQ 缺口。

Kafka 4.2.0 以前,真正制約 DLQ 設計的是 transaction 邊界:只要使用獨立 producer 寫入 DLQ,該筆寫入就脫離 Kafka Streams 的 transaction 範圍;在 EOS 開啟時,retry 可能造成重複寫入。除此之外,錯誤攔截層級、送出方式與 headers 命名也都必須由 application 層自行維護。

4.2.0 之後,設定 StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG 並搭配內建 exception handler,錯誤資料即可交由 Kafka Streams 內部寫入 DLQ,不必另行建立 producer。對於仍在維護手動 DLQ 的 Kafka Streams 專案,KIP-1034 值得納入升級評估。

參考資料:

API Design: Use type state pattern to avoid ambiguous option flags

· One min read

For example, ZADD is a command that add member with score to sorted set, and it can accept NX or XX as option.

ZADD key [NX | XX] [GT | LT] [CH] [INCR] score member [score member
...]
  • XX: Only update elements that already exist. Don't add new elements.
  • NX: Only add new elements. Don't update already existing elements.

NX and XX can only choose one. In go-redis, this structure is used to hold arguments, but this requires extra comments and checks to tell users that NX and XX are mutually exclusive.

type ZAddArgs struct {
NX bool
XX bool
LT bool
GT bool
Ch bool
Members []Z
}

Ref: go-redis ZAddArgs

But in redis/rueidis, it provides a command builder where the type system directly prevents you from setting both NX and XX at the same time.

client.B().Zadd().Key("1").Nx().Gt().Ch().Incr().ScoreMember().ScoreMember(1, "1").ScoreMember(1, "1").Build()

Build Nested JSON in PostgreSQL

· 2 min read

Original Stackoverflow thread:

https://stackoverflow.com/questions/42222968/create-nested-json-from-sql-query-postgres-9-4/42226253#42226253

Suppose we have this tables:

person car wheel And the relation between is:

person:car = 1:N car:wheel = 1:N We need to build some nested JSON Object with SQL Query to get the summary about details of each car this person has, what would you do ?

The Goal

{
"persons": [
{
"person_name": "Johny",
"cars": [
{
"carid": 1,
"type": "Toyota",
"comment": "nice car",
"wheels": [
{
"which": "front",
"serial number": 11
},
{
"which": "back",
"serial number": 12
}
]
},
{
"carid": 2,
"type": "Fiat",
"comment": "nice car",
"wheels": [
{
"which": "front",
"serial number": 21
},
{
"which": "back",
"serial number": 22
}
]
}
]
},
{
"person_name": "Freddy",
"cars": [
{
"carid": 3,
"type": "Opel",
"comment": "nice car",
"wheels": [
{
"which": "front",
"serial number": 3
}
]
}
]
}
]
}

Approach 1 - Left Join

select
json_build_object(
'persons', json_agg(
json_build_object(
'person_name', p.name,
'cars', cars
)
)
) persons
from person p
left join (
select
personid,
json_agg(
json_build_object(
'carid', c.id,
'type', c.type,
'comment', 'nice car', -- this is constant
'wheels', wheels
)
) cars
from
car c
left join (
select
carid,
json_agg(
json_build_object(
'which', w.whichone,
'serial number', w.serialnumber
)
) wheels
from wheel w
group by 1
) w on c.id = w.carid
group by personid
) c on p.id = c.personid;

Approach 2 - Put sub-query in SELECT-List with json_build_object and json_agg

This is the SQL query based on Nico Van Belle's answer, but I replaced row_to_json with json_buid_object.

select json_build_object(
'persons', (
SELECT json_agg(
json_build_object(
'person_id',id,
'cars', (
SELECT json_agg(
json_build_object(
'car_id', car.id,
'wheels', (
SELECT json_agg(
json_build_object(
'wheel_id', wheel.id,
'whichone', wheel.whichone,
'serialnumber', wheel.serialnumber,
'car_id', wheel.carid
)
)
FROM wheel WHERE wheel.carid = car.id
)
)
) FROM car WHERE id = person.id
)
)
) FROM person
)
);

You can view the result ojnline with db<>fiddle

Why Cost is so high ?

  • Each Sub-node has to be executed N times, where N is number of person

Query Plan

Summary

I think putting sub-query in SELECT-List is elegant, but it's costly.

https://medium.com/@e850506/note-more-nested-json-5f3c1e4a87e

Use sync.Pool to reduce memory consumption

· 5 min read

Our service is like a excel document datastore. and we use xorm as ORM framework, Everytime we need to get data from DB, we call session.Find(&[]Author{}) with the slice of table beans, but this have a problem,

  • Memory allocation is very high

So every time lots of clients try to download excel file, the memory consumption is too high, and downloadling excel file takes too long to complete.

Find the root cause with pprof

I wrote a benchmark and by leveraging GO's pprof profiling tool, we can easily check out the flamegraph using some tool like pyroscope.

Here's the result we got:

CPU

Structure-Binding-cpu

Memory Allocation

Structure-Binding-mem

We can see that under the frame of (*Session).rows2Beans, except the function underneath xorm framework that we can't touch, (*Session).slice2Bean took a lot of CPU time and had lot of memory allocation.

The problem of Structure Binding

After took a look at the code in noCacheFind, I found that if we use bean (a structure with information about db schema definition) to hold the result set, xorm will call session.rows2Beans to convert rows into tableBean.

In sesson.rows2Beans(), it will:

  • convert rows to slices ([]any) by calling session.row2Slice()
  • convert []any to []bean by calling session.slice2Bean()

And this tooks a lot of time.

But I also found that if we use [][]string to hold the result set, after getting xorm.Rows (underlying data structure is database/sql.Rows), noCacheFind() will call rows.Scan for each row, so simple ! This is the chance we can make session.Find() much faster.

Step 1: Use [][]string to hold the data

Based on the assumption, we can use [][]string to reduce the cost of structure binding, you can see the benchmark below unifyContainerNoPool .

Step 2: Use sync.Pool to reduce memory allocation

But it still need huge amount of memory allocation for every []string and every [][]string, let's see how we can reduce this cost.

The solution I came out is very simple, if memory allocation is time-consuming, why don't we reuse the data structure in memory ? In this case, we're using [][]string

var unifyContainerRowPool = sync.Pool{
New: func() interface{} {
conRow := make([]string, DefaultContainerColLen)
conRow = resetUnifyContainerRow(conRow)
return conRow[:0]
},
}

var unifyContainerPool = sync.Pool{
New: func() interface{} {
// fmt.Println("New is called for unifyContainerPool")
con := make([][]string, 0, DefaultContainerRowLen)
return con
},
}

Experiment:

To demonstrate the improvement of our code, I design a simple benchmark,

There are three ways we can get data from database.

  • Use []Author to hold the data (Structure Binding)
  • Use [][]string to hold the data (Unify Container without sync.Pool)
  • Use [][]string to hold the data, and use sync.Pool to reuse [][]string (Unify Container with sync.Pool)

For row number between 1000 and 8000 to demonstrate the benefit of sync.Pool, we use runtime.NumCPU() worker to perform runtime.NumCPU()*4 jobs, every job gets all rows from the author table

$ make BENCHTIME=1s
go test -benchmem -benchtime=1s \
-bench=. \
| tee data/result_all.txt
goos: darwin
goarch: arm64
pkg: github.com/unknowntpo/playground-2022/go/xorm/unifyContainer
BenchmarkContainer/StructureBinding-1000-8 13 78949647 ns/op 91926655 B/op 3146081 allocs/op
BenchmarkContainer/UnifyContainerWithPool-1000-8 31 39028380 ns/op 31799634 B/op 1882362 allocs/op
BenchmarkContainer/UnifyContainerNoPool-1000-8 22 48651809 ns/op 48547759 B/op 2407600 allocs/op
BenchmarkContainer/StructureBinding-2000-8 8 137213729 ns/op 189730109 B/op 6284178 allocs/op
BenchmarkContainer/UnifyContainerWithPool-2000-8 15 72343683 ns/op 63592857 B/op 3759864 allocs/op
BenchmarkContainer/UnifyContainerNoPool-2000-8 12 87559920 ns/op 97780912 B/op 4807668 allocs/op
BenchmarkContainer/StructureBinding-3000-8 6 199308167 ns/op 281507561 B/op 9422225 allocs/op
BenchmarkContainer/UnifyContainerWithPool-3000-8 10 105695333 ns/op 97377107 B/op 5654077 allocs/op
BenchmarkContainer/UnifyContainerNoPool-3000-8 8 128159927 ns/op 146226483 B/op 7207695 allocs/op
BenchmarkContainer/StructureBinding-4000-8 4 256713490 ns/op 379839898 B/op12560279 allocs/op
BenchmarkContainer/UnifyContainerWithPool-4000-8 8 140550521 ns/op 129773817 B/op 7537186 allocs/op
BenchmarkContainer/UnifyContainerNoPool-4000-8 7 165150417 ns/op 195457696 B/op 9607724 allocs/op
BenchmarkContainer/StructureBinding-5000-8 4 323341906 ns/op 486299350 B/op15698332 allocs/op
BenchmarkContainer/UnifyContainerWithPool-5000-8 7 162782482 ns/op 163561488 B/op 9417513 allocs/op
BenchmarkContainer/UnifyContainerNoPool-5000-8 5 200822450 ns/op 245477224 B/op12007762 allocs/op
BenchmarkContainer/StructureBinding-6000-8 6 195379785 ns/op 195452422 B/op11307507 allocs/op
BenchmarkContainer/UnifyContainerWithPool-6000-8 6 195379785 ns/op 195452422 B/op11307507 allocs/op
BenchmarkContainer/UnifyContainerNoPool-6000-8 4 258140198 ns/op 296804806 B/op14407787 allocs/op
BenchmarkContainer/StructureBinding-7000-8 3 512568570 ns/op 720955394 B/op21974306 allocs/op
BenchmarkContainer/UnifyContainerWithPool-7000-8 4 251422083 ns/op 224965602 B/op13170581 allocs/op
BenchmarkContainer/UnifyContainerNoPool-7000-8 4 288070792 ns/op 349445756 B/op16807820 allocs/op
BenchmarkContainer/StructureBinding-8000-8 2 531542583 ns/op 792064800 B/op25112484 allocs/op
BenchmarkContainer/UnifyContainerWithPool-8000-8 4 271685614 ns/op 260817526 B/op15089126 allocs/op
BenchmarkContainer/UnifyContainerNoPool-8000-8 4 338913490 ns/op 395270596 B/op19207827 allocs/op
PASS
ok github.com/unknowntpo/playground-2022/go/xorm/unifyContainer 46.676s

The result shows that the number of allocation per operation is quite different,

The Structure Binding Method needs the largest number of allocations, and the speed is way slower that other two methods. When row number goes high, performance get worse very quickly.

The Method of using [][]string with sync.Pool on the other hand, needs smallest number of memory allocation, and compare to the one without sync.Pool, and because memory allocation takes significant amount of time, it's still faster.

Here's the plot:

perf

I put my code at the repo, please go check it out!

Optimize a PARTITION - SELECT query up to 60x faster

· 9 min read

This post demonstrates my experience of optimizing a PARTITION - SELECT query, and how I made it up to 60x faster.

Original Query and the use case

Our App is a simple excel data version control system, the data is organized by project, key and data is stored in seperated table called dbKey and dbData.

create table dbKey (
id serial ,
project_id int,
-- keys goes here
-- NOTE: key can be 1...N fields, and we use string.Join(fields, sep)
-- to handle it has the key string in backend service
name text
);
create table dbData (
id serial ,
key_id int ,
timestamp int

--- data stores at here
);

and there's also a sheet_version table that stores the version, timestamp information.

create table sheet_version (
id serial ,
version integer,
timestamp int
);

Every time we need to get specific version of data (let's say: version 2), we access sheet_version table first, and get the sheet_version.timestamp to construct the PARTITION - SELECT query.

To get the actual data, we need to do these steps:

  1. Partition the data table dbData by key_id,
  2. Rank it by timestamp (DESC), get the rank=1 datas from dbData
  3. Join dbKey and dbData back togetter.

Here's the query:

SELECT
dbKey.*, finalDBData.*
FROM
dbKey,
(
SELECT
*,
rank() OVER (PARTITION BY key_id ORDER BY TIMESTAMP DESC) AS rank
FROM
dbData where "timestamp" <= 101) finalDBData
where
dbKey.project_id = 10
and rank =1
and finalDBData.key_id = dbKey.id;

Here's the db<>fiddle you can play with this query.

info

We choose this design because it can save a lot of space to store every version of data. If version 2 has 10 keys, each key has 50 data, and if we change data under only 1 key, we only have to re-insert all data under this modified key. and only need to insert 50 data. Of course, this design has some limitations, but in this post, let's focus on the PARTITION - SELECT query optimization.

Identifying the root cause

SELECT
dbKey.*, finalDBData.*
FROM
dbKey,
(
SELECT
*,
rank() OVER (PARTITION BY key_id ORDER BY TIMESTAMP DESC) AS rank
FROM
dbData where "timestamp" <= 101) finalDBData
where rank =1
and finalDBData.key_id = dbKey.id;

Useless index and time-consuming Sequential scan

This query is slow because it has to:

  1. Scan the whole dbData table
  2. partition it by key_id, and rank the timestamp.
  3. Join it with dbKey table with rank=1 and finalDBData.key_id = dbKey.id

Planner tends to range over every row in data table to get rank=1 data because the rank=1 key_id - timestamp can be anywhere in the whole table.

This query it's so slow, we current have about 30000 keys in key table, each project has about 2000 keys, and almost 100 milion data rows in data table, it usually takes at least 60 second to get the particular version of data.

Here's the plan of this query:

------------------------------------------------------------------------------------------------------------------------------------------------------------------
Hash Join (cost=1125351.65..1289874.58 rows=5621 width=57) (actual time=9082.308..9468.256 rows=11020 loops=1)
Output: dbkey.id, dbkey.name, finaldbdata.id, finaldbdata.key_id, finaldbdata."timestamp", finaldbdata.rank
Hash Cond: (finaldbdata.key_id = dbkey.id)
Buffers: shared hit=358 read=545756, temp read=3000 written=3018
-> Subquery Scan on finaldbdata (cost=1125043.98..1289482.62 rows=5614 width=20) (actual time=9077.986..9459.255 rows=11000 loops=1)
Output: finaldbdata.id, finaldbdata.key_id, finaldbdata."timestamp", finaldbdata.rank
Filter: (finaldbdata.rank = 1)
Rows Removed by Filter: 1100200
Buffers: shared hit=274 read=545756, temp read=3000 written=3018
-> WindowAgg (cost=1125043.98..1275448.81 rows=1122705 width=20) (actual time=9077.985..9432.015 rows=1111200 loops=1)
Output: dbdata.id, dbdata.key_id, dbdata."timestamp", rank() OVER (?)
Buffers: shared hit=274 read=545756, temp read=3000 written=3018
-> Gather Merge (cost=1125043.98..1255801.47 rows=1122705 width=12) (actual time=9077.972..9174.199 rows=1111200 loops=1)
Output: dbdata.key_id, dbdata."timestamp", dbdata.id
Workers Planned: 2
Workers Launched: 2
Buffers: shared hit=274 read=545756, temp read=3000 written=3018
-> Sort (cost=1124043.95..1125213.44 rows=467794 width=12) (actual time=9060.365..9078.656 rows=370400 loops=3)
Output: dbdata.key_id, dbdata."timestamp", dbdata.id
Sort Key: dbdata.key_id, dbdata."timestamp" DESC
Sort Method: external merge Disk: 8304kB
Buffers: shared hit=274 read=545756, temp read=3000 written=3018
Worker 0: actual time=9048.365..9066.503 rows=354371 loops=1
Sort Method: external merge Disk: 7656kB
Buffers: shared hit=105 read=175482, temp read=957 written=963
Worker 1: actual time=9060.662..9079.499 rows=372284 loops=1
Sort Method: external merge Disk: 8040kB
Buffers: shared hit=105 read=180922, temp read=1005 written=1011
-> Parallel Seq Scan on public.dbdata (cost=0.00..1071990.75 rows=467794 width=12) (actual time=5.360..8698.716 rows=370400 loops=3)
Output: dbdata.key_id, dbdata."timestamp", dbdata.id
Filter: (dbdata."timestamp" <= 101)
Rows Removed by Filter: 33296333
Buffers: shared hit=192 read=545756
Worker 0: actual time=4.511..8532.085 rows=354371 loops=1
Buffers: shared hit=64 read=175482
Worker 1: actual time=3.410..8640.241 rows=372284 loops=1
Buffers: shared hit=64 read=180922
-> Hash (cost=183.41..183.41 rows=9941 width=37) (actual time=4.312..4.313 rows=10010 loops=1)
Output: dbkey.id, dbkey.name
Buckets: 16384 Batches: 1 Memory Usage: 803kB
Buffers: shared hit=84
-> Seq Scan on public.dbkey (cost=0.00..183.41 rows=9941 width=37) (actual time=0.007..1.395 rows=10010 loops=1)

And you can also view it on explain.dalibo.com

Approach 1: Materialized View

We can use materialized view to cache the result set of particalar version of data, but the first one who needs to get data still suffers from the slow query.

Improvement: Index-Only Scan

But this query still can be better. There's a new feature introduced in PostgreSQL 9.2, which allow us to get data from index itself, without touching the actual table data.

The documentation stats that There are two fundamental restrictions on when this method can be used:

  1. The index type must support index-only scans. B-tree indexes always do. GiST and SP-GiST indexes support index-only scans for some operator classes but not others. Other index types have no support. The underlying requirement is that the index must physically store, or else be able to reconstruct, the original data value for each index entry. As a counterexample, GIN indexes cannot support index-only scans because each index entry typically holds only part of the original data value.
  2. The query must reference only columns stored in the index. For example, given an index on columns x and y of a table that also has a column z, these queries could use index-only scans:
  3. If these two fundamental requirements are met, then all the data values required by the query are available from the index, so an index-only scan is physically possible. But there is an additional requirement for any table scan in PostgreSQL: it must verify that each retrieved row be "visible" to the query's MVCC snapshot, as discussed in Chapter 13. Visibility information is not stored in index entries, only in heap entries; so at first glance it would seem that every row retrieval would require a heap access anyway. And this is indeed the case, if the table row has been modified recently. However, for seldom-changing data there is a way around this problem. PostgreSQL tracks, for each page in a table's heap, whether all rows stored in that page are old enough to be visible to all current and future transactions. This information is stored in a bit in the table's visibility map. An index-only scan, after finding a candidate index entry, checks the visibility map bit for the corresponding heap page. If it's set, the row is known visible and so the data can be returned with no further work. If it's not set, the heap entry must be visited to find out whether it's visible, so no performance advantage is gained over a standard index scan. Even in the successful case, this approach trades visibility map accesses for heap accesses; but since the visibility map is four orders of magnitude smaller than the heap it describes, far less physical I/O is needed to access it. In most situations the visibility map remains cached in memory all the time.

Let's verify if all these restrictions is satisfied:

info
  1. It's staicfied because we are using B-tree index.
  2. It's satisfied by modifying our SQL query,
  3. It means all data in that page must be visible in visibility map, and it's also satisfied because the data is append-only.

We can build the map between key_id and the rank=1 timestamp first,

WITH map AS (
SELECT
DISTINCT(key_id),
timestamp
FROM (
SELECT
key_id,
timestamp,
rank() OVER (PARTITION BY key_id ORDER BY TIMESTAMP DESC) AS rank
FROM
dbData
-- filtering stuff depends on business logic
where "timestamp" <= 10000 and key_id < 100
) sub WHERE rank = 1)
SELECT * FROM map;

Result will be like:

 key_id | timestamp
--------+-----------
1 | 10000
2 | 300
3 | 6000
4 | 90303

And then, get actual data from dbData with specific key_id and timestamp pair.

WITH map AS (
SELECT
DISTINCT(key_id),
timestamp
FROM (
SELECT
key_id,
timestamp,
rank() OVER (PARTITION BY key_id ORDER BY TIMESTAMP DESC) AS rank
FROM
dbData
-- filtering stuff depends on business logic
where "timestamp" <= 10000 and key_id < 100
) sub WHERE rank = 1)
SELECT
dbKey.*, dbData.*
FROM
dbKey
INNER JOIN map m ON m.key_id = dbKey.id
INNER JOIN dbData ON dbData.key_id = m.key_id AND m.timestamp = dbData.timestamp;

The reason we build the map first is that the SELECT list in map are all stored in the index, which satisfied requirement 2 in the documentation, and later when we query dbData , we can still have Index Scan.

Here's the example

note

UPDATE: The key_id in map should be unique, or there will be duplicated keys with same timestamp, so I added DISTINCT(key_id) to the map query.

Final choice: I want them all!

We decided to use this optimized query to build the materialized view, and maintain a materialized view (we call it mat_view for short) management system to organize the creation, deletion of these mat_views.

Reference:

ChatGPT First Glance

· One min read

This is my first glance of ChatGPT, and I ask her to generate a peice of code in Haskell, which can map a function to a list.

The result she generated is totally correct, and can be run in playground.

addOneToEach :: [Int] -> [Int]
addOneToEach xs = map (+1) xs

myMap :: (a -> b) -> [a] -> [b]
myMap _ [] = []
myMap f (x:xs) = f x : myMap f xs

main = do
let myList = [1, 2, 3, 4]
let doubledList = myMap (*2) myList
print doubledList
-- Output: [2,4,6,8]

Here's the link to our chat: https://sharegpt.com/c/yedzb1N

My First Post

· One min read

Inline Formula: E=iEi=E1+E2+E3+\mathbf{E}=\sum_{i} \mathbf{E}_{i}=\mathbf{E}_{1}+\mathbf{E}_{2}+\mathbf{E}_{3}+\cdots

SELECT 'hello-world' FROM me

Block Formula:

a=b+cd+e=fa=b+c \\ d+e=f a=b+cd+e=fa=b+c \\ d+e=f

Test IFrame