Parquet のパーティショニングによる性能最適化

この記事は Apache Drill Advent Calendar 2015 の23日目の記事です。

Drill では Parquet フォーマットを使うことによって、パーティションプルーニングによる性能上のメリットを得ることができます。パーティションプルーニングとは、アクセスするパーティションを WHERE 句の条件により自動的に絞り込む機能で、これにより処理の効率化が期待されます。

パーティションプルーニング機能を使うには、まずテーブルをパーティションに分けておく必要があります。PARTITION BY 句をつけて CTAS(CREATE TABLE AS SELECT)操作を行うことでパーティション化したテーブルを作成します。

Parquet の書き込み処理では、まずパーティションキーによるソートが行われ、パーティションカラムに新しい値を見つける毎にファイルが作られます。Drill は同じディレクトリ内にパーティション毎のファイルを作りますが、1つのファイルには1つのパーティションキーの値しか含みません。ただし、1つのパーティションキーに対してファイルが複数になることはあります。

では、例を見てみましょう。元データは最近よく使う個人情報 CSV ファイルです。

$ head -n 3 personal_information_50m.csv 
1,碓井卓也,ウスイタクヤ,0,0984963105,takuya57105@rrqec.sq,889-0507,宮崎県,延岡市,旭ケ丘,2-17,,ミヤザキケン,ノベオカシ,アサヒガオカ,2-17,,1985/08/13,29,東京都,AB,836,"FBQv,9nv"
2,脇田勇三,ワキタユウゾウ,0,0554838431,yuuzou679@ydomu.ee,400-0121,山梨県,甲斐市,牛句,4-2-12,牛句ロイヤルパレス213,ヤマナシケン,カイシ,ウシク,4-2-12,ウシクロイヤルパレス213,1978/04/28,36,沖縄県,A,862,kp1fDUPQ
3,立川円香,タチカワマドカ,1,0221247688,madoka_tachikawa@uzppymocdm.za,980-0855,宮城県,仙台市青葉区,川内澱橋通,4-1-13,川内澱橋通コーポ303,ミヤギケン,センダイシアオバク,カワウチヨドミバシドオリ,4-1-13,カワウチヨドミバシドオリコーポ303,1990/01/29,24,鹿児島県,A,881,XtFKa9kL

これを CTAS で Parquet に変換します。この例では、都道府県を格納するカラム address1 をパーティションキーに指定しています。

$ apache-drill-1.4.0/bin/sqlline -u jdbc:drill:zk=node1:5181
0: jdbc:drill:zk=node1:5181> CREATE TABLE dfs.tmp.`/personal_information/`
. . . . . . . . . . . . . .> PARTITION BY (address1)
. . . . . . . . . . . . . .> AS SELECT
. . . . . . . . . . . . . .>   CAST(columns[0] AS INT) AS id,
. . . . . . . . . . . . . .>   columns[1] AS name,
. . . . . . . . . . . . . .>   CAST(columns[3] AS INT) AS sex,
. . . . . . . . . . . . . .>   columns[4] AS phone,
. . . . . . . . . . . . . .>   columns[5] AS email,
. . . . . . . . . . . . . .>   columns[6] AS zip,
. . . . . . . . . . . . . .>   columns[7] AS address1,
. . . . . . . . . . . . . .>   columns[8] AS address2,
. . . . . . . . . . . . . .>   columns[9] AS address3,
. . . . . . . . . . . . . .>   columns[10] AS address4,
. . . . . . . . . . . . . .>   columns[11] AS address5,
. . . . . . . . . . . . . .>   TO_DATE(columns[17], 'yyyy/MM/dd') AS birth_date
. . . . . . . . . . . . . .> FROM dfs.`/personal_information_50m.csv`;
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 1_4       | 5200254                    |
| 1_8       | 5200257                    |
| 1_5       | 5200239                    |
| 1_2       | 5200257                    |
| 1_7       | 5200248                    |
| 1_1       | 5277845                    |
| 1_3       | 6240297                    |
| 1_0       | 6240310                    |
| 1_6       | 6240293                    |
+-----------+----------------------------+
9 rows selected (557.927 seconds)

実行結果のテーブルを見ると、9つの Minor Fragment が並列にデータを書き出したことが読み取れます。出力先のディレクトリには以下のようにファイルができています。ファイル名先頭の「1_x」は書き出し処理を行った Minor Fragment の番号、次の「_yy」はパーティションの番号です(ここでは都道府県でパーティショニングしたので48個に分かれていますね)。

$ ls tmp/personal_information/
1_0_1.parquet   1_1_45.parquet  1_3_37.parquet  1_5_29.parquet  1_7_20.parquet
1_0_10.parquet  1_1_46.parquet  1_3_38.parquet  1_5_3.parquet   1_7_21.parquet
1_0_11.parquet  1_1_47.parquet  1_3_39.parquet  1_5_30.parquet  1_7_22.parquet
1_0_12.parquet  1_1_48.parquet  1_3_4.parquet   1_5_31.parquet  1_7_23.parquet
1_0_13.parquet  1_1_5.parquet   1_3_40.parquet  1_5_32.parquet  1_7_24.parquet
1_0_14.parquet  1_1_6.parquet   1_3_41.parquet  1_5_33.parquet  1_7_25.parquet
...

元の CSV と Parquet のファイルサイズを比べると、Parquet ファイルの圧縮効果が一目瞭然です。Parquet おそるべし!

$ du -sm personal_information_50m.csv tmp/personal_information/
12308	personal_information_50m.csv
3258	tmp/personal_information/

では WHERE 句の条件つきのクエリを実行してみます。

0: jdbc:drill:zk=node1:5181> SELECT name, address1, address2, address3, address4, address5, birth_date
. . . . . . . . . . . . . .> FROM dfs.tmp.`/personal_information/`
. . . . . . . . . . . . . .> WHERE address1 = _UTF16'神奈川県' AND
. . . . . . . . . . . . . .>       address2 = _UTF16'横須賀市' AND
. . . . . . . . . . . . . .>       birth_date > '1985-01-01'
. . . . . . . . . . . . . .> LIMIT 5;
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
|  name  | address1  | address2  | address3  | address4  |   address5    | birth_date  |
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
| 高見善次郎  | 神奈川県      | 横須賀市      | 馬堀海岸      | 2-5-10    |               | 1987-06-13  |
| 福田渚    | 神奈川県      | 横須賀市      | 船越町       | 2-12-8    | 船越町ステーション105  | 1986-11-24  |
| 辻清作    | 神奈川県      | 横須賀市      | 平和台       | 2-11      | 平和台テラス202     | 1985-09-20  |
| 河田果凛   | 神奈川県      | 横須賀市      | 久村        | 2-18      | ザ久村305        | 1991-08-23  |
| 尾上謙治   | 神奈川県      | 横須賀市      | グリーンハイツ   | 1-17-16   |               | 1992-04-05  |
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
5 rows selected (31.381 seconds)

肝心なのは、実際にどんなプランが実行されたかです。EXPLAIN PLAN FOR もしくは Drill Web UI でクエリプランを見てみます。するとスキャン対象のファイルが「1_x_34.parquet」という名前の 9 つのみであることがわかります。「神奈川県」をキーとするデータは 34 番のパーティションに入っており、スキャン対象がきちんと絞り込まれていることを確認できました。

00-00    Screen
00-01      Project(name=[$0], address1=[$1], address2=[$2], address3=[$3], address4=[$4], address5=[$5], birth_date=[$6])
00-02        SelectionVectorRemover
00-03          Limit(fetch=[5])
00-04            UnionExchange
01-01              SelectionVectorRemover
01-02                Limit(fetch=[5])
01-03                  Project(name=[$3], address1=[$0], address2=[$1], address3=[$4], address4=[$5], address5=[$6], birth_date=[$2])
01-04                    SelectionVectorRemover
01-05                      Filter(condition=[AND(=($0, _UTF-16LE'神奈川県'), =($1, _UTF-16LE'横須賀市'), >($2, '1985-01-01'))])
01-06                        Project(address1=[$4], address2=[$6], birth_date=[$2], name=[$0], address3=[$5], address4=[$1], address5=[$3])
01-07                          Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tmp/personal_information/1_1_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_5_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_2_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_6_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_0_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_4_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_8_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_3_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_7_34.parquet]], selectionRoot=maprfs:/tmp/personal_information, numFiles=9, usedMetadataFile=false, columns=[`address1`, `address2`, `birth_date`, `name`, `address3`, `address4`, `address5`]]])

以前の記事でも解説したように、Parquet は Column-oriented フォーマットであるため、各ファイルのスキャンにおいても SELECT 対象のカラムの部分に絞り込んで読み込みを行っています。圧縮効果によるファイルサイズの縮小とも合わせ、大きなパフォーマンス向上の効果が見込めるでしょう。

Drill Web UI のビジュアルなクエリプロファイル

この記事は Apache Drill Advent Calendar 2015 の22日目の記事です。

Drill のパフォーマンスチューニングに役立つ情報の一つはクエリプラン、そしてもう一つはクエリプロファイルです。今回はクエリプロファイルでどんな情報が見られるかを紹介していきましょう。

今回使うサンプルデータはこんな感じです。まずは 5,000万人分の個人情報が入った CSV ファイル約13GB(ダミー情報です)。

$ head -n 3 personal_information_50m.csv 
1,碓井卓也,ウスイタクヤ,0,0984963105,takuya57105@rrqec.sq,889-0507,宮崎県,延岡市,旭ケ丘,2-17,,ミヤザキケン,ノベオカシ,アサヒガオカ,2-17,,1985/08/13,29,東京都,AB,836,"FBQv,9nv"
2,脇田勇三,ワキタユウゾウ,0,0554838431,yuuzou679@ydomu.ee,400-0121,山梨県,甲斐市,牛句,4-2-12,牛句ロイヤルパレス213,ヤマナシケン,カイシ,ウシク,4-2-12,ウシクロイヤルパレス213,1978/04/28,36,沖縄県,A,862,kp1fDUPQ
3,立川円香,タチカワマドカ,1,0221247688,madoka_tachikawa@uzppymocdm.za,980-0855,宮城県,仙台市青葉区,川内澱橋通,4-1-13,川内澱橋通コーポ303,ミヤギケン,センダイシアオバク,カワウチヨドミバシドオリ,4-1-13,カワウチヨドミバシドオリコーポ303,1990/01/29,24,鹿児島県,A,881,XtFKa9kL

そしてゆるキャラ情報が入った JSON ファイル約1KB。

$ cat yuru.json 
{
  "address1":"熊本県",
  "address2":"",
  "character":"くまモン",
}
{
  "address1":"千葉県",
  "address2":"船橋市",
  "character":"ふなっしー",
}
{
  "address1":"奈良県",
  "address2":"",
  "character":"せんとくん"
}
...

これを3ノードからなる Hadoop分散ファイルシステムに配置し、各ノードでは Drillbit を稼働しておきます。クエリは、ゆるキャラごとにその住所に住んでいる人数をカウントするというシンプルな処理です。

SELECT
  b.`character` AS `ゆるキャラ`,
  b.address1 as `都道府県`,
  b.address2 as `市区町村`,
  count(*) as `人数`
FROM
  dfs.`/personal_information_50m.csv` AS a
JOIN
  dfs.`/yuru.json` AS b
ON
  (b.address1 = a.columns[7]) AND
  (b.address2 = a.columns[8] OR b.address2 = '') AND
  b.`character` <> ''
GROUP BY b.`character`, b.address1, b.address2;

まず Drill の Web UI の「Query」タブでこの SQL を Submit します。

f:id:nagixx:20151222192355p:plain

クエリの実行が始まり、しばらくすると結果が表示されます。

f:id:nagixx:20151222192744p:plain

ここで「Profiles」タブをクリックすると、完了したクエリの一覧が表示されます。ただし、表示されるクエリはアクセスしている Web UI のノードが Foreman(クエリを受け付けたノード)となったクエリだけですので、それが必ずしもクエリを投入したノードになるとは限らないことにご注意を。対象のクエリが表示されなかったら、ほかの Drillbit ノードの UI も見てみましょう。

f:id:nagixx:20151222193619p:plain

ここで対象のクエリをクリックすると、そのクエリのプロファイル情報が表示されます。ここには大きく分けて次のような情報があります。

  • クエリとそのプラン(Query and Planning)
  • クエリのプロファイル(Query Profile)
  • Fragment のプロファイル(Fragment Profiles)
  • オペレータのプロファイル(Operator Profiles)
  • 完全な JSON 形式のプロファイル(Full JSON Profile)

f:id:nagixx:20151222193855p:plain

これらを見ていく前に、プランをビジュアルに表示する「Visualized Plan」タブの内容が便利なので、まず見てみます。ここではプランの非循環有向グラフ(DAG)が色分けされて表示されます。実際の処理は下から上に向けて流れていきます。

グラフの各ノードはスキャン、フィルタ、集約などの処理を表すオペレータで、Major Fragment ごとに異なる色が付けられています。よくみると、Major Fragment はクラスタノード間でデータの交換が行われる Exchange オペレータを境に分割されていることがわかります。

また、各ノードについている番号は、前回の記事でも出てきた <Major Fragment ID>-<Operator ID> を示しています。

f:id:nagixx:20151222195038p:plain

さて、Fragment Profiles の欄に目を移すと、各 Major Fragment の処理がどのように並列化され、各処理にどれだけ時間がかかったかがわかるタイムラインの表示があります。それぞれの色は Visualized Plan の Major Fragment の色に対応しており、並列度が大きいほど上下方向の幅が太く表示されることになります。

下の例では、水色の Major Fragment 04 の処理は一瞬で完了し、緑色の Major Fragment 03 の処理はある程度の並列度でしばらく進んだ後、若干のばらつきがありつつ完了していることがわかります。もしこの Major Fragment の表示が極端な段差を示している場合には、データの偏りやデータローカリティの問題が疑われます。

f:id:nagixx:20151223030602p:plain

さらにもう少し詳細に Major Fragment の内容を見ていくには、Fragment Profiles の「Major Fragment: <Major Fragment ID>-xx-xx」と書かれた帯をクリックすると、詳細が展開表示されます。

各行の先頭についている番号は <Major Fragment ID>-<Minor Fragment>-xx を表しています。Minor Fragment というのは、Major Fragment を並列化した一つ一つの処理のことです。各 Minor Fragment の実行ノードや処理時間、処理レコード数、ピークメモリ量などを見ることができます。

下の例では、Major Fragment 03 が 9 並列に分割され、3ノードに均等に割り当てられていることがわかります。

f:id:nagixx:20151223031816p:plain

ところで上記の Minor Fragment の情報では並列に実行される様子がわかりましたが、Minor Fragment 内に含まれている個々のオペレータがどれくらいのリソースを使って実行されたかまではわかりません。これを確認するためには Operator Profiles に注目します。

各行の先頭についている番号は <Major Fragment ID>-xx-<Operator ID> です。そして各オペレータのセットアップ時間、処理時間、待ち時間、ピークメモリ量などを見ることができます。ここではオペレータに焦点を置いているため Minor Fragment の区別はしていませんが、最小値、平均値、最大値は Minor Fragment 間で集約した結果の値です。

f:id:nagixx:20151223033352p:plain

もしさらに詳細な Minor Fragment ごとのオペレータのプロファイルが欲しい場合は、Operator Profiles の「<Major Fragment ID>-xx-<Operator ID> - <Operator Name>」が書かれた帯をクリックします。すると、<Major Fragment ID>-<Minor Fragment>-<Operator ID> のレベルまで細分化されたプロファイルを得ることができます。

f:id:nagixx:20151223034516p:plain

クエリ処理のどの部分でどれだけ時間やリソースを消費しているかを把握することは、パフォーマンスチューニングの基本中の基本ですので、まずはこの UI をマスターしましょう。

クエリプランの見かたとハッキングの方法

この記事は Apache Drill Advent Calendar 2015 の19日目の記事です。

SQL が動いて正しい結果が返ってくればよい、というのであれば必要ないのですが、パフォーマンスが気になり始めたらクエリプランとプロファイルを調べていく必要が出てきます。

SQL が発行されると、Drill はクラスタ構成やデータソースに関する統計情報をヒントに、これを論理プラン→物理プラン→実行プランに展開していきます。Drill のクエリの実行の様子については下記の記事を参考にしてください。

クエリの論理プランと物理プランは、EXPLAIN コマンドで確認することができます。ただし、実行プランについては見ることができませんので、確認したい場合には Drill の Web UI でクエリプロファイルを見る必要があります。

さて、まず論理プランは SQL のオペレータを Drill のパーサが論理オペレータに変換することで作られます。ここでのポイントは、論理プランには実際のクラスタ構成やデータに基づく並列化や最適化などが含まれないという点です。

下記は論理プランを表示した例です。EXPLAIN に WITHOUT IMPLEMENTATION をつけることで論理プランの表示になります。下から上の方向に、JSON ファイルのスキャン、フィルタリング、グループ集計、プロジェクションが順に行われることが分かります。

0: jdbc:drill:zk=local> EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR SELECT type t, COUNT(DISTINCT id) FROM dfs.`/tmp/donuts.json` WHERE type='donut' GROUP BY type;
+------------+------------+
|   text    |   json    |
+------------+------------+
| DrillScreenRel
  DrillProjectRel(t=[$0], EXPR$1=[$1])
    DrillAggregateRel(group=[{0}], EXPR$1=[COUNT($1)])
    DrillAggregateRel(group=[{0, 1}])
        DrillFilterRel(condition=[=($0, 'donut')])
        DrillScanRel(table=dfs, /tmp/donuts.json, groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`type`, `id`], files=[file:/tmp/donuts.json]]]) | {
  "head" : {
    "version" : 1,
    "generator" : {
    "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
    "info" : ""
    },
    "type" : "APACHE_DRILL_LOGICAL",
    "options" : null,
    "queue" : 0,
    "resultMode" : "LOGICAL"
  }, ...

一方、物理プランは Drill のオプティマイザが様々なルールを元にオペレータや関数の再構成を行った後の最適化されたプランです。物理プランはクエリの実行パフォーマンスに大きく影響を与えるため、意図通りに物理プランが作成されているか、もしくはさらに最適化できる余地がないかどうかを探る際に、とても重要です。さらに、物理プランの一部を変更してクエリを再実行してみる、ということも可能です。

物理プランを EXPLAIN で表示してみましょう。今度は WITHOUT IMPLEMENTATION をつけていません。

0: jdbc:drill:zk=local> EXPLAIN PLAN FOR SELECT type t, COUNT(DISTINCT id) FROM dfs.`/tmp/donuts.json` WHERE type='donut' GROUP BY type;
+------------+------------+
|   text    |   json    |
+------------+------------+
| 00-00 Screen
00-01   Project(t=[$0], EXPR$1=[$1])
00-02       Project(t=[$0], EXPR$1=[$1])
00-03       HashAgg(group=[{0}], EXPR$1=[COUNT($1)])
00-04           HashAgg(group=[{0, 1}])
00-05           SelectionVectorRemover
00-06               Filter(condition=[=($0, 'donut')])
00-07               Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`type`, `id`], files=[file:/tmp/donuts.json]]]) ...

各行の先頭には

<Major Fragment ID>-<Operator ID>

という番号が割り振られています。Major Fragment というのは、クエリ実行のフェーズを表す抽象的な概念です。Major Fragment には1つ以上のオペレーターが含まれますが、Major Fragment 自体がタスクを実行するわけではありません。Drill のクエリプランでは、クラスタノード間でデータの交換が必要になるポイントで Major Fragment が分けられます。オペレータはフィルタリングやハッシュ集約などの処理そのものです。

Drill Query Execution: Major Fragments
https://drill.apache.org/docs/drill-query-execution/#major-fragments

さらに、より詳細な調査が必要な場合には、INCLUDING ALL ATTRIBUTES をつけることでクエリプランの選択の根拠として使われたコスト情報に関する情報が表示されます。予想される処理行数、CPU、メモリ、ディスク I/O、ネットワーク I/O リソースの見積もりを確認することで、そのクエリが効率的に実行されるかどうかの判断の助けになります。

0: jdbc:drill:zk=local> EXPLAIN PLAN INCLUDING ALL ATTRIBUTES FOR SELECT * FROM dfs.`/tmp/donuts.json` WHERE type='donut';
+------------+------------+
|   text    |   json    |
+------------+------------+
| 00-00 Screen: rowcount = 1.0, cumulative cost = {5.1 rows, 21.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 889
00-01   Project(*=[$0]): rowcount = 1.0, cumulative cost = {5.0 rows, 21.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 888
00-02       Project(T1¦¦*=[$0]): rowcount = 1.0, cumulative cost = {4.0 rows, 17.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 887
00-03       SelectionVectorRemover: rowcount = 1.0, cumulative cost = {3.0 rows, 13.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 886
00-04           Filter(condition=[=($1, 'donut')]): rowcount = 1.0, cumulative cost = {2.0 rows, 12.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 885
00-05           Project(T1¦¦*=[$0], type=[$1]): rowcount = 1.0, cumulative cost = {1.0 rows, 8.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 884
00-06               Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`*`], files=[file:/tmp/donuts.json]]]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 883 ...

例えば、前回の記事ではジョインの最適化について書きましたが、Distributed Join が選択されるのか、Broadcast join が選択されるのか、その選択の基準になったコスト情報はどうか、ということをクエリプランから読み取ることが可能です。

最後に、Drill が生成した物理プランを変更し再実行するハッキングの方法を紹介します。パフォーマンスの向上を模索するときに、一部のオペレータを変えてみたらどうなるのか、を確かめたいときに役に立ちます。

  1. EXPLAIN PLAN FOR をクエリの先頭につけて、物理プランを表示する
  2. 物理プランの JSON 出力をコピーして、直接プランを望むように編集する
  3. Drill の Web UI(http:// <Drillbit が動作するノード>:8047)にアクセスする
  4. メニューバーから Query を選択する
  5. Query Type のラジオボタンで PHYSICAL を選択する
  6. Query 欄に編集した物理プランをペーストして Submit をクリックする

f:id:nagixx:20151220005054p:plain

Drill 内部のジョインストラテジー

この記事は Apache Drill Advent Calendar 2015 の18日目の記事です。

一般的な RDMBS のジョインアルゴリズムには、代表的なものとして ネストループ結合、マージ結合、ハッシュ結合などがあります。それぞれレコードへのアクセス方法や順序などが異なっており、最適なアルゴリズムを使うことでジョイン処理を効率化することができます。このアルゴリズムの選択は、通常はクエリオプティマイザの仕事です。

一方、分散型の SQL 処理エンジンでは、データが物理的に複数ノードに分散されているため、上記のジョインアルゴリズムとは別の軸として、分散環境特有のジョインプランを考える必要があります。

Drill では、「分散ジョイン (Distributed Join)」と「ブロードキャストジョイン(Broadcast Join)」の2つの選択肢があります。実際、商用の分散RDBMS(Nettezza、Vertica、Greenplum、Redshift など)や SQL on Hadoop(Hive、Impala、Presto など)でも同様のジョインの最適化が行われており、すでにおなじみの手法です。

下記は Drill と他の SQL エンジンでの呼び方を並べたものですが、本質的にはそれぞれ同じプランを表しています。

Drill での呼び方他の SQL エンジンでの呼び方
Distributed Join Shuffle Join、Common Join
Broadcast Join Map Join、Fragment Replicate Join

Distributed Join

Distributed Join においては、両方のテーブルの各レコードが、結合キーのハッシュ値に基づいてノード間に再分散されます。同じハッシュ値を持つレコード、つまり同じ結合キーをもつレコードは同じノードに存在するようになるはずです。

この「ハッシュ分散」オペレーションが行われた後、マージ結合アルゴリズムであれば両テーブルの各レコードはソートされ、ジョインが行われます。ハッシュ結合アルゴリズムであればソートは不要です。ただし、ネストループ結合の場合には Distributed Join を行うことはできません。

Broadcast Join

Broadcast Join では、ジョインが行われる前に片方のテーブルのデータ全体が、すべてのノードにブロードキャストされます。具体的には、内部表側のデータがブロードキャストされる一方、外部表側のデータはそのままで移動させません。

Broadcast Join は大きいファクトテーブル(トランザクションテーブル)と小さいディメンジョンテーブル(マスターテーブル)をジョインするような場合に便利です。大きなファクトテーブルのハッシュ分散はネットワークに負荷をかけるため、小さなディメンジョンテーブルのブロードキャストのほうがより効率的です。ただし、すべてのノードに同じデータを送るため、クラスタの構成やデータのサイズによっては最適にならない場合もあります。

Broadcast Join はネストループ結合、マージ結合、ハッシュ結合のいずれのジョインアルゴリズムにも対応します。

ジョインのタイプネストループ結合マージ結合ハッシュ結合
Distributed Join 不可
Broadcast Join

では、クエリオプティマイザはどのように Distributed Join、Broadcast Join を選択するのでしょうか。まず、内部表の行数がプロパティ planner.broadcast_threshold が表す閾値を超えていれば、Broadcast Join の候補となりえます。それに加えて、クラスタ構成やデータサイズを元にコストの計算が行われ、どのプランが効率的かの判断が行われます。

planner.broadcast_threshold を含め、選択に影響を与えるプロパティを下記に示します。

プロパティ説明デフォルト値
planner.broadcast_factor ブロードキャストのコストを計算する際の係数。小さくすると、Broadcast Join がより選択されやすくなる 1
planner.enable_broadcast_join Broadcast Join を有効にする true
planner.broadcast_threshold Broadcast Join の基準となる行数のしきい値。内部表の行数がこの値を上回る(と予想される)場合は Broadcast Join は選択されない 10000000

HBase データソースに対する Pushdown (2)

この記事は Apache Drill Advent Calendar 2015 の16日目の記事です。

前回の記事からの続きです。

前回は HBase テーブルを対象としたクエリに WHERE 句で条件を加えることで、HBase 側で Pushdown を行う実行プランが作成されている様子を確認しました。では、もう少し複雑な条件ではどうでしょうか。2つの行キーの比較を OR で結んだ条件にしてみます。下にクエリとその実行プランの HBaseScanSpec のパラメータを示します。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key = 'xxx' OR row_key = 'yyy';
startRow=xxx, stopRow=yyy\x00,
filter=FilterList OR (2/2): [
  RowFilter (EQUAL, xxx),
  RowFilter (EQUAL, yyy)
]

スキャンの開始行キーは「xxx」、終了行キーは「yyy\x00」(終了行キー自体はスキャン範囲には含まれないので、末尾に \x00 をつけて yyy も範囲に含めるようにしている)となり、範囲スキャンを行うようにした上で、RowFilter で条件の合う行キーのチェックをしています。複数の RowFilter は FilterList により OR で結合する形になっています。それなりに効率的ですね。

次に、LIKE 演算子で条件を指定してみましょう。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key LIKE 'xxx%';
startRow=xxx, stopRow=xxy,
filter=RowFilter (EQUAL, ^\x5CQxxx\x5CE.*$)

行キーに対する条件が前方一致であるため、範囲スキャンが有効になっていることがわかります。これに加え、LIKE に対応する正規表現が RowFilter のパターン文字列に指定されています。正規表現のマッチングですので、HBase API では RegexStringComparator が使われているはずです。

では、次のような LIKE の後方一致ではどうでしょうか。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key LIKE '%xxx';
startRow=, stopRow=,
filter=RowFilter (EQUAL, ^.*\x5CQxxx\x5CE$)

後方一致では行キーの範囲を絞り込めず、フルスキャンになることがわかります。

これまでの例では、暗黙的に行キーやカラムの値が UTF-8 文字列であることを想定してクエリを記述していましたが、HBase に格納した値が数値型やタイムスタンプ型だった場合は、クエリの書き方に少し注意が必要です。もし行キーの値がビッグエンディアンで格納されていれば、数値や日付の大きさ順でソートされることになるため、範囲フィルタの Pushdown を有効に利用できる可能性があります。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'UINT8_BE')
BETWEEN CAST(100 AS BIGINT) AND CAST(200 AS BIGINT);
startRow=\x00\x00\x00\x00\x00\x00\x00d,
stopRow=\x00\x00\x00\x00\x00\x00\x00\xC9,
filter=null

上の例では WHERE 句の中で、まず row_key に対し BYTE_SUBSTR 関数で格納されている数値のバイト長(ここでは8バイト)分を先頭から取り出しています。さらにそれを CONVERT_FROM 関数で符号なしのビッグエンディアン(ここでは UNIT8_BE)のエンコーディングで数値型に変換しています。そして同じバイト長の数値型(ここでは BIGINT 型)にキャストした数値と比較します。これらの条件をすべて満たすことにより、範囲スキャンの Pushdown が有効になります。

最後に、Drill は [HBASE-8201] OrderedBytes: an ordered encoding strategy で HBase に追加された、OrderedBytes と呼ばれるエンコーディング手法に対応しています。このエンコーディングでは、データがバイト列として格納されるときにそのデータ型のソート順を保持することができるため、Pushdown に活用することが可能です。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE CONVERT_FROM(row_key, 'INT_OB')
BETWEEN CAST(-32 AS INT) AND CAST(59 AS INT);
startRow=+\x7F\xFF\xFF\xE0, stopRow=+\x80\x00\x00;\x00,
filter=FilterList AND (2/2): [
  RowFilter (GREATER_OR_EQUAL, +\x7F\xFF\xFF\xE0),
  RowFilter (LESS_OR_EQUAL, +\x80\x00\x00;)
]

HBase にデータが OrderedBytes で格納されている場合、上のように数値を「INT_OB」エンコーディングで変換して比較することで、範囲スキャンが有効になっていることがわかります。

以上、様々な Pushdown による最適化を紹介しましたが、将来的にはこれ以上のテクニックが駆使されてさらに最適化が進むことが予想されます。例えば、HBase の行キーが複合キー(Intelligent Key)である場合の扱いにはまだ改良の余地があります。今後の性能向上を期待しましょう。

HBase データソースに対する Pushdown (1)

この記事は Apache Drill Advent Calendar 2015 の15日目の記事です。

HBase は Hadoop 上で動作する、「ワイドカラム型」NoSQL データベースです。RDBMS 風のテーブル構造を持ちますが、固定のスキーマを持つわけではないのでデータ構造の変更には柔軟である一方、HBase 自体にはシンプルなアクセス API があるのみで SQL クエリ機能はありません。Drill は HBase をデータストアとして使用することができるため、HBase に格納されたデータを使い慣れた SQL で柔軟に取り扱えるようになるのがメリットです。

ところが、Drill のようにデータストアと実行エンジンが分離しているアーキテクチャで、最適化をまったく考えずに実装してしまうと性能が全然出ません。最も単純な実装として、HBase でテーブルをフルスキャンして Drill 側にデータを渡し、Drill が SQL クエリの実行に必要なフィルタリングや集計を行う、という役割の分離が考えられますが、HBase のフルスキャンかなり遅いです。まあ、これは HBase に限らず Drill が使用するどのようなデータストア(MongoDB や RDBMS とか)にも当てはまります。

そこで、このような状況では一般的に Pushdown という最適化が行われます。HBase の例で言うと、まず HBase では「テーブルの各行にはユニークな行キーが存在しており、ソートされインデックスがついた状態で格納されている」という前提があるため、クエリの述語(WHERE 句の条件)によってあらかじめ行キーのスキャン範囲を絞り込めれば、I/O アクセスが大幅に減り性能向上につながります。他にも、カラムの値の比較が必要な場合に、HBase 内部の実装で比較を行ってから結果を Drill 側に渡してやったほうがより効率的で高速です。このように、より低いレイヤーの実装に処理を移譲する最適化が Pushdown です。

最近 BigQueryで150万円溶かした人の顔というエントリで、「WHERE句には何を書いてもテーブルをフルスキャンしてしまう」というのが話題になっておりましたが、BigQuery に述語の Pushdown の実装がなされていればこんなことにはならなかったはずです。Drill は BigQuery のベースになっている Dremel のオープンソース実装なので、Drill にできて BigQuery にできないことはないと思うのですが、謎ですね。

さて、実際にどのようなクエリがどんな風に Pushdown されるのかを見てみましょう。EXPLAIN PLAN FOR を SELECT の前につけることでクエリの実行プランが表示されるので、違いを確認することができます。

まず HBase テーブル全体を SELECT するクエリ。

$ apache-drill-1.3.0/bin/drill-embedded
0: jdbc:drill:zk=local> EXPLAIN PLAN FOR
. . . . . . . . . . . > SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`;
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=null, stopRow=null, filter=null], columns=[`row_key`, `cf`.`c1`]]])
...

そして WHERE 句で行キーに条件をつけた場合のクエリ。

0: jdbc:drill:zk=local> EXPLAIN PLAN FOR
. . . . . . . . . . . > SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`
. . . . . . . . . . . > WHERE row_key = 'xxx';
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=xxx, stopRow=xxx\x00, filter=null], columns=[`row_key`, `cf`.`c1`]]])
...

実行プランの Scan オペレーターのパラメーターの中に HBaseScanSpec という指定があり、さらに startRow, stopRow という指定もついていることがわかります。そして、WHERE 句をつけると startRow, stopRow に値が入ることも確認できます。

実際には、HBase の API で Scan オブジェクトが作られて、スキャンの開始行キー startRow、終了行キー stopRow がパラメータとして渡され、HBase の範囲スキャン操作が行われます。WHERE 句に行キーの条件をつけることで、大幅に処理が効率化されることがわかります。

別の例も見てみましょう。

0: jdbc:drill:zk=local> SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`
. . . . . . . . . . . > WHERE cf.c1 = 'xxx';
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=null, stopRow=null, filter=SingleColumnValueFilter (cf, c1, EQUAL, xxx)], columns=[`row_key`, `cf`.`c1`]]])
...

今度は行キーの条件はつけていないため範囲スキャンは行われませんが、カラムの値の比較条件がついていることで、filter というパラメータに SingleColumnValueFilter の指定が出てきます。これは HBase のスキャン時に、そのまま SingleColumnValueFilter のパラメータとして HBase 内部のフィルタリング処理に使われます。

以上のように、Pushdown によるクエリ実行計画の最適化の様子がおわかりいただけたかと思いますが、クエリの書き方によっては Pushdown が期待通りに働かないケースもあります。そのあたりの注意点は次の記事に続きます。

テキストファイルとNULLの扱い

この記事は Apache Drill Advent Calendar 2015 の14日目の記事です。

CSV 形式などのテキストファイルでどのように NULL を表現するかは、CSV を出力する RDBMS やアプリケーション毎に異なっているので結構悩みのタネですね。

  • Oracle(SPOOL を使用): 引用符のない空文字
  • SQL Server: 引用符のない空文字
  • DB2: 引用符のない空文字
  • MySQL: \N
  • PostgreSQL: \N(テキスト)、引用符のない空文字(CSV

デフォルトでは上記のような感じだと思いますが、これってオプションで形式を変えることもできますし、この他にも色々なアプリケーションがあるので、その都度表現形式を確認して処理をする必要があります。

さて、Drill では CSV 形式などのテキストファイルをデータソースとする場合は、すべて可変長文字列型として扱われます。なので、次のようなファイルがあったとしたら、

$ cat test.csv
"Japan","14","0"
"United States","23","3"
"China",\N,"5"
"France","16",

Drill で SELECT した結果は次のようになります。\N も引用符のない空文字も、NULL ではなく空文字列として扱われているのがわかります。

$ apache-drill-1.3.0/bin/drill-embedded
0: jdbc:drill:zk=local> SELECT columns[0], columns[1], columns[2] FROM dfs.`/tmp/test.csv`;
+----------------+---------+---------+
|     EXPR$0     | EXPR$1  | EXPR$2  |
+----------------+---------+---------+
| Japan          | 14      | 0       |
| United States  | 23      | 3       |
| China          | \N      | 5       |
| France         | 16      |         |
+----------------+---------+---------+
4 rows selected (0.093 seconds)

では、例えば引用符のない空文字を NULL として扱いたい場合はどうすればよいでしょうか。

方法1: CASE を使う

CASE を使用して、空文字であれば NULL に置き換えます。この方法は NULL を表現するのに他の形式が使われたときにも有効です。あとは、カラム毎に扱いを変えたいときにも柔軟に対応できますね。

0: jdbc:drill:zk=local> SELECT
. . . . . . . . . . . >   columns[0],
. . . . . . . . . . . >   CASE WHEN columns[2] = '' THEN NULL ELSE columns[1] END,
. . . . . . . . . . . > FROM dfs.`/tmp/test.csv`;
+----------------+---------+
|     EXPR$0     | EXPR$1  |
+----------------+---------+
| Japan          | 0       |
| United States  | 3       |
| China          | 5       |
| France         | null    |
+----------------+---------+
4 rows selected (0.116 seconds)

方法2: drill.exec.functions.cast_empty_string_to_null プロパティを使う

drill.exec.functions.cast_empty_string_to_null プロパティを true に設定すると、空文字列が「キャストをする時に」NULL として扱われるようになります。キャストをしない場合には、そのまま空文字列として扱われます。

0: jdbc:drill:zk=local> ALTER SYSTEM SET `drill.exec.functions.cast_empty_string_to_null` = true;
+-------+----------------------------------------------------------+
|  ok   |                         summary                          |
+-------+----------------------------------------------------------+
| true  | drill.exec.functions.cast_empty_string_to_null updated.  |
+-------+----------------------------------------------------------+
1 row selected (0.08 seconds)
0: jdbc:drill:zk=local> SELECT columns[0], CAST(columns[2] AS INT) FROM dfs.`/tmp/test.csv`;
+----------------+---------+
|     EXPR$0     | EXPR$1  |
+----------------+---------+
| Japan          | 0       |
| United States  | 3       |
| China          | 5       |
| France         | null    |
+----------------+---------+
4 rows selected (0.139 seconds)