Parquet のパーティショニングによる性能最適化

この記事は Apache Drill Advent Calendar 2015 の23日目の記事です。

Drill では Parquet フォーマットを使うことによって、パーティションプルーニングによる性能上のメリットを得ることができます。パーティションプルーニングとは、アクセスするパーティションを WHERE 句の条件により自動的に絞り込む機能で、これにより処理の効率化が期待されます。

パーティションプルーニング機能を使うには、まずテーブルをパーティションに分けておく必要があります。PARTITION BY 句をつけて CTAS(CREATE TABLE AS SELECT)操作を行うことでパーティション化したテーブルを作成します。

Parquet の書き込み処理では、まずパーティションキーによるソートが行われ、パーティションカラムに新しい値を見つける毎にファイルが作られます。Drill は同じディレクトリ内にパーティション毎のファイルを作りますが、1つのファイルには1つのパーティションキーの値しか含みません。ただし、1つのパーティションキーに対してファイルが複数になることはあります。

では、例を見てみましょう。元データは最近よく使う個人情報 CSV ファイルです。

$ head -n 3 personal_information_50m.csv 
1,碓井卓也,ウスイタクヤ,0,0984963105,takuya57105@rrqec.sq,889-0507,宮崎県,延岡市,旭ケ丘,2-17,,ミヤザキケン,ノベオカシ,アサヒガオカ,2-17,,1985/08/13,29,東京都,AB,836,"FBQv,9nv"
2,脇田勇三,ワキタユウゾウ,0,0554838431,yuuzou679@ydomu.ee,400-0121,山梨県,甲斐市,牛句,4-2-12,牛句ロイヤルパレス213,ヤマナシケン,カイシ,ウシク,4-2-12,ウシクロイヤルパレス213,1978/04/28,36,沖縄県,A,862,kp1fDUPQ
3,立川円香,タチカワマドカ,1,0221247688,madoka_tachikawa@uzppymocdm.za,980-0855,宮城県,仙台市青葉区,川内澱橋通,4-1-13,川内澱橋通コーポ303,ミヤギケン,センダイシアオバク,カワウチヨドミバシドオリ,4-1-13,カワウチヨドミバシドオリコーポ303,1990/01/29,24,鹿児島県,A,881,XtFKa9kL

これを CTAS で Parquet に変換します。この例では、都道府県を格納するカラム address1 をパーティションキーに指定しています。

$ apache-drill-1.4.0/bin/sqlline -u jdbc:drill:zk=node1:5181
0: jdbc:drill:zk=node1:5181> CREATE TABLE dfs.tmp.`/personal_information/`
. . . . . . . . . . . . . .> PARTITION BY (address1)
. . . . . . . . . . . . . .> AS SELECT
. . . . . . . . . . . . . .>   CAST(columns[0] AS INT) AS id,
. . . . . . . . . . . . . .>   columns[1] AS name,
. . . . . . . . . . . . . .>   CAST(columns[3] AS INT) AS sex,
. . . . . . . . . . . . . .>   columns[4] AS phone,
. . . . . . . . . . . . . .>   columns[5] AS email,
. . . . . . . . . . . . . .>   columns[6] AS zip,
. . . . . . . . . . . . . .>   columns[7] AS address1,
. . . . . . . . . . . . . .>   columns[8] AS address2,
. . . . . . . . . . . . . .>   columns[9] AS address3,
. . . . . . . . . . . . . .>   columns[10] AS address4,
. . . . . . . . . . . . . .>   columns[11] AS address5,
. . . . . . . . . . . . . .>   TO_DATE(columns[17], 'yyyy/MM/dd') AS birth_date
. . . . . . . . . . . . . .> FROM dfs.`/personal_information_50m.csv`;
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 1_4       | 5200254                    |
| 1_8       | 5200257                    |
| 1_5       | 5200239                    |
| 1_2       | 5200257                    |
| 1_7       | 5200248                    |
| 1_1       | 5277845                    |
| 1_3       | 6240297                    |
| 1_0       | 6240310                    |
| 1_6       | 6240293                    |
+-----------+----------------------------+
9 rows selected (557.927 seconds)

実行結果のテーブルを見ると、9つの Minor Fragment が並列にデータを書き出したことが読み取れます。出力先のディレクトリには以下のようにファイルができています。ファイル名先頭の「1_x」は書き出し処理を行った Minor Fragment の番号、次の「_yy」はパーティションの番号です(ここでは都道府県でパーティショニングしたので48個に分かれていますね)。

$ ls tmp/personal_information/
1_0_1.parquet   1_1_45.parquet  1_3_37.parquet  1_5_29.parquet  1_7_20.parquet
1_0_10.parquet  1_1_46.parquet  1_3_38.parquet  1_5_3.parquet   1_7_21.parquet
1_0_11.parquet  1_1_47.parquet  1_3_39.parquet  1_5_30.parquet  1_7_22.parquet
1_0_12.parquet  1_1_48.parquet  1_3_4.parquet   1_5_31.parquet  1_7_23.parquet
1_0_13.parquet  1_1_5.parquet   1_3_40.parquet  1_5_32.parquet  1_7_24.parquet
1_0_14.parquet  1_1_6.parquet   1_3_41.parquet  1_5_33.parquet  1_7_25.parquet
...

元の CSV と Parquet のファイルサイズを比べると、Parquet ファイルの圧縮効果が一目瞭然です。Parquet おそるべし!

$ du -sm personal_information_50m.csv tmp/personal_information/
12308	personal_information_50m.csv
3258	tmp/personal_information/

では WHERE 句の条件つきのクエリを実行してみます。

0: jdbc:drill:zk=node1:5181> SELECT name, address1, address2, address3, address4, address5, birth_date
. . . . . . . . . . . . . .> FROM dfs.tmp.`/personal_information/`
. . . . . . . . . . . . . .> WHERE address1 = _UTF16'神奈川県' AND
. . . . . . . . . . . . . .>       address2 = _UTF16'横須賀市' AND
. . . . . . . . . . . . . .>       birth_date > '1985-01-01'
. . . . . . . . . . . . . .> LIMIT 5;
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
|  name  | address1  | address2  | address3  | address4  |   address5    | birth_date  |
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
| 高見善次郎  | 神奈川県      | 横須賀市      | 馬堀海岸      | 2-5-10    |               | 1987-06-13  |
| 福田渚    | 神奈川県      | 横須賀市      | 船越町       | 2-12-8    | 船越町ステーション105  | 1986-11-24  |
| 辻清作    | 神奈川県      | 横須賀市      | 平和台       | 2-11      | 平和台テラス202     | 1985-09-20  |
| 河田果凛   | 神奈川県      | 横須賀市      | 久村        | 2-18      | ザ久村305        | 1991-08-23  |
| 尾上謙治   | 神奈川県      | 横須賀市      | グリーンハイツ   | 1-17-16   |               | 1992-04-05  |
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
5 rows selected (31.381 seconds)

肝心なのは、実際にどんなプランが実行されたかです。EXPLAIN PLAN FOR もしくは Drill Web UI でクエリプランを見てみます。するとスキャン対象のファイルが「1_x_34.parquet」という名前の 9 つのみであることがわかります。「神奈川県」をキーとするデータは 34 番のパーティションに入っており、スキャン対象がきちんと絞り込まれていることを確認できました。

00-00    Screen
00-01      Project(name=[$0], address1=[$1], address2=[$2], address3=[$3], address4=[$4], address5=[$5], birth_date=[$6])
00-02        SelectionVectorRemover
00-03          Limit(fetch=[5])
00-04            UnionExchange
01-01              SelectionVectorRemover
01-02                Limit(fetch=[5])
01-03                  Project(name=[$3], address1=[$0], address2=[$1], address3=[$4], address4=[$5], address5=[$6], birth_date=[$2])
01-04                    SelectionVectorRemover
01-05                      Filter(condition=[AND(=($0, _UTF-16LE'神奈川県'), =($1, _UTF-16LE'横須賀市'), >($2, '1985-01-01'))])
01-06                        Project(address1=[$4], address2=[$6], birth_date=[$2], name=[$0], address3=[$5], address4=[$1], address5=[$3])
01-07                          Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tmp/personal_information/1_1_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_5_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_2_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_6_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_0_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_4_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_8_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_3_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_7_34.parquet]], selectionRoot=maprfs:/tmp/personal_information, numFiles=9, usedMetadataFile=false, columns=[`address1`, `address2`, `birth_date`, `name`, `address3`, `address4`, `address5`]]])

以前の記事でも解説したように、Parquet は Column-oriented フォーマットであるため、各ファイルのスキャンにおいても SELECT 対象のカラムの部分に絞り込んで読み込みを行っています。圧縮効果によるファイルサイズの縮小とも合わせ、大きなパフォーマンス向上の効果が見込めるでしょう。