クエリプランの見かたとハッキングの方法

この記事は Apache Drill Advent Calendar 2015 の19日目の記事です。

SQL が動いて正しい結果が返ってくればよい、というのであれば必要ないのですが、パフォーマンスが気になり始めたらクエリプランとプロファイルを調べていく必要が出てきます。

SQL が発行されると、Drill はクラスタ構成やデータソースに関する統計情報をヒントに、これを論理プラン→物理プラン→実行プランに展開していきます。Drill のクエリの実行の様子については下記の記事を参考にしてください。

クエリの論理プランと物理プランは、EXPLAIN コマンドで確認することができます。ただし、実行プランについては見ることができませんので、確認したい場合には Drill の Web UI でクエリプロファイルを見る必要があります。

さて、まず論理プランは SQL のオペレータを Drill のパーサが論理オペレータに変換することで作られます。ここでのポイントは、論理プランには実際のクラスタ構成やデータに基づく並列化や最適化などが含まれないという点です。

下記は論理プランを表示した例です。EXPLAIN に WITHOUT IMPLEMENTATION をつけることで論理プランの表示になります。下から上の方向に、JSON ファイルのスキャン、フィルタリング、グループ集計、プロジェクションが順に行われることが分かります。

0: jdbc:drill:zk=local> EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR SELECT type t, COUNT(DISTINCT id) FROM dfs.`/tmp/donuts.json` WHERE type='donut' GROUP BY type;
+------------+------------+
|   text    |   json    |
+------------+------------+
| DrillScreenRel
  DrillProjectRel(t=[$0], EXPR$1=[$1])
    DrillAggregateRel(group=[{0}], EXPR$1=[COUNT($1)])
    DrillAggregateRel(group=[{0, 1}])
        DrillFilterRel(condition=[=($0, 'donut')])
        DrillScanRel(table=dfs, /tmp/donuts.json, groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`type`, `id`], files=[file:/tmp/donuts.json]]]) | {
  "head" : {
    "version" : 1,
    "generator" : {
    "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
    "info" : ""
    },
    "type" : "APACHE_DRILL_LOGICAL",
    "options" : null,
    "queue" : 0,
    "resultMode" : "LOGICAL"
  }, ...

一方、物理プランは Drill のオプティマイザが様々なルールを元にオペレータや関数の再構成を行った後の最適化されたプランです。物理プランはクエリの実行パフォーマンスに大きく影響を与えるため、意図通りに物理プランが作成されているか、もしくはさらに最適化できる余地がないかどうかを探る際に、とても重要です。さらに、物理プランの一部を変更してクエリを再実行してみる、ということも可能です。

物理プランを EXPLAIN で表示してみましょう。今度は WITHOUT IMPLEMENTATION をつけていません。

0: jdbc:drill:zk=local> EXPLAIN PLAN FOR SELECT type t, COUNT(DISTINCT id) FROM dfs.`/tmp/donuts.json` WHERE type='donut' GROUP BY type;
+------------+------------+
|   text    |   json    |
+------------+------------+
| 00-00 Screen
00-01   Project(t=[$0], EXPR$1=[$1])
00-02       Project(t=[$0], EXPR$1=[$1])
00-03       HashAgg(group=[{0}], EXPR$1=[COUNT($1)])
00-04           HashAgg(group=[{0, 1}])
00-05           SelectionVectorRemover
00-06               Filter(condition=[=($0, 'donut')])
00-07               Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`type`, `id`], files=[file:/tmp/donuts.json]]]) ...

各行の先頭には

<Major Fragment ID>-<Operator ID>

という番号が割り振られています。Major Fragment というのは、クエリ実行のフェーズを表す抽象的な概念です。Major Fragment には1つ以上のオペレーターが含まれますが、Major Fragment 自体がタスクを実行するわけではありません。Drill のクエリプランでは、クラスタノード間でデータの交換が必要になるポイントで Major Fragment が分けられます。オペレータはフィルタリングやハッシュ集約などの処理そのものです。

Drill Query Execution: Major Fragments
https://drill.apache.org/docs/drill-query-execution/#major-fragments

さらに、より詳細な調査が必要な場合には、INCLUDING ALL ATTRIBUTES をつけることでクエリプランの選択の根拠として使われたコスト情報に関する情報が表示されます。予想される処理行数、CPU、メモリ、ディスク I/O、ネットワーク I/O リソースの見積もりを確認することで、そのクエリが効率的に実行されるかどうかの判断の助けになります。

0: jdbc:drill:zk=local> EXPLAIN PLAN INCLUDING ALL ATTRIBUTES FOR SELECT * FROM dfs.`/tmp/donuts.json` WHERE type='donut';
+------------+------------+
|   text    |   json    |
+------------+------------+
| 00-00 Screen: rowcount = 1.0, cumulative cost = {5.1 rows, 21.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 889
00-01   Project(*=[$0]): rowcount = 1.0, cumulative cost = {5.0 rows, 21.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 888
00-02       Project(T1¦¦*=[$0]): rowcount = 1.0, cumulative cost = {4.0 rows, 17.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 887
00-03       SelectionVectorRemover: rowcount = 1.0, cumulative cost = {3.0 rows, 13.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 886
00-04           Filter(condition=[=($1, 'donut')]): rowcount = 1.0, cumulative cost = {2.0 rows, 12.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 885
00-05           Project(T1¦¦*=[$0], type=[$1]): rowcount = 1.0, cumulative cost = {1.0 rows, 8.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 884
00-06               Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`*`], files=[file:/tmp/donuts.json]]]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 883 ...

例えば、前回の記事ではジョインの最適化について書きましたが、Distributed Join が選択されるのか、Broadcast join が選択されるのか、その選択の基準になったコスト情報はどうか、ということをクエリプランから読み取ることが可能です。

最後に、Drill が生成した物理プランを変更し再実行するハッキングの方法を紹介します。パフォーマンスの向上を模索するときに、一部のオペレータを変えてみたらどうなるのか、を確かめたいときに役に立ちます。

  1. EXPLAIN PLAN FOR をクエリの先頭につけて、物理プランを表示する
  2. 物理プランの JSON 出力をコピーして、直接プランを望むように編集する
  3. Drill の Web UI(http:// <Drillbit が動作するノード>:8047)にアクセスする
  4. メニューバーから Query を選択する
  5. Query Type のラジオボタンで PHYSICAL を選択する
  6. Query 欄に編集した物理プランをペーストして Submit をクリックする

f:id:nagixx:20151220005054p:plain

Drill 内部のジョインストラテジー

この記事は Apache Drill Advent Calendar 2015 の18日目の記事です。

一般的な RDMBS のジョインアルゴリズムには、代表的なものとして ネストループ結合、マージ結合、ハッシュ結合などがあります。それぞれレコードへのアクセス方法や順序などが異なっており、最適なアルゴリズムを使うことでジョイン処理を効率化することができます。このアルゴリズムの選択は、通常はクエリオプティマイザの仕事です。

一方、分散型の SQL 処理エンジンでは、データが物理的に複数ノードに分散されているため、上記のジョインアルゴリズムとは別の軸として、分散環境特有のジョインプランを考える必要があります。

Drill では、「分散ジョイン (Distributed Join)」と「ブロードキャストジョイン(Broadcast Join)」の2つの選択肢があります。実際、商用の分散RDBMS(Nettezza、Vertica、Greenplum、Redshift など)や SQL on Hadoop(Hive、Impala、Presto など)でも同様のジョインの最適化が行われており、すでにおなじみの手法です。

下記は Drill と他の SQL エンジンでの呼び方を並べたものですが、本質的にはそれぞれ同じプランを表しています。

Drill での呼び方他の SQL エンジンでの呼び方
Distributed Join Shuffle Join、Common Join
Broadcast Join Map Join、Fragment Replicate Join

Distributed Join

Distributed Join においては、両方のテーブルの各レコードが、結合キーのハッシュ値に基づいてノード間に再分散されます。同じハッシュ値を持つレコード、つまり同じ結合キーをもつレコードは同じノードに存在するようになるはずです。

この「ハッシュ分散」オペレーションが行われた後、マージ結合アルゴリズムであれば両テーブルの各レコードはソートされ、ジョインが行われます。ハッシュ結合アルゴリズムであればソートは不要です。ただし、ネストループ結合の場合には Distributed Join を行うことはできません。

Broadcast Join

Broadcast Join では、ジョインが行われる前に片方のテーブルのデータ全体が、すべてのノードにブロードキャストされます。具体的には、内部表側のデータがブロードキャストされる一方、外部表側のデータはそのままで移動させません。

Broadcast Join は大きいファクトテーブル(トランザクションテーブル)と小さいディメンジョンテーブル(マスターテーブル)をジョインするような場合に便利です。大きなファクトテーブルのハッシュ分散はネットワークに負荷をかけるため、小さなディメンジョンテーブルのブロードキャストのほうがより効率的です。ただし、すべてのノードに同じデータを送るため、クラスタの構成やデータのサイズによっては最適にならない場合もあります。

Broadcast Join はネストループ結合、マージ結合、ハッシュ結合のいずれのジョインアルゴリズムにも対応します。

ジョインのタイプネストループ結合マージ結合ハッシュ結合
Distributed Join 不可
Broadcast Join

では、クエリオプティマイザはどのように Distributed Join、Broadcast Join を選択するのでしょうか。まず、内部表の行数がプロパティ planner.broadcast_threshold が表す閾値を超えていれば、Broadcast Join の候補となりえます。それに加えて、クラスタ構成やデータサイズを元にコストの計算が行われ、どのプランが効率的かの判断が行われます。

planner.broadcast_threshold を含め、選択に影響を与えるプロパティを下記に示します。

プロパティ説明デフォルト値
planner.broadcast_factor ブロードキャストのコストを計算する際の係数。小さくすると、Broadcast Join がより選択されやすくなる 1
planner.enable_broadcast_join Broadcast Join を有効にする true
planner.broadcast_threshold Broadcast Join の基準となる行数のしきい値。内部表の行数がこの値を上回る(と予想される)場合は Broadcast Join は選択されない 10000000

HBase データソースに対する Pushdown (2)

この記事は Apache Drill Advent Calendar 2015 の16日目の記事です。

前回の記事からの続きです。

前回は HBase テーブルを対象としたクエリに WHERE 句で条件を加えることで、HBase 側で Pushdown を行う実行プランが作成されている様子を確認しました。では、もう少し複雑な条件ではどうでしょうか。2つの行キーの比較を OR で結んだ条件にしてみます。下にクエリとその実行プランの HBaseScanSpec のパラメータを示します。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key = 'xxx' OR row_key = 'yyy';
startRow=xxx, stopRow=yyy\x00,
filter=FilterList OR (2/2): [
  RowFilter (EQUAL, xxx),
  RowFilter (EQUAL, yyy)
]

スキャンの開始行キーは「xxx」、終了行キーは「yyy\x00」(終了行キー自体はスキャン範囲には含まれないので、末尾に \x00 をつけて yyy も範囲に含めるようにしている)となり、範囲スキャンを行うようにした上で、RowFilter で条件の合う行キーのチェックをしています。複数の RowFilter は FilterList により OR で結合する形になっています。それなりに効率的ですね。

次に、LIKE 演算子で条件を指定してみましょう。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key LIKE 'xxx%';
startRow=xxx, stopRow=xxy,
filter=RowFilter (EQUAL, ^\x5CQxxx\x5CE.*$)

行キーに対する条件が前方一致であるため、範囲スキャンが有効になっていることがわかります。これに加え、LIKE に対応する正規表現が RowFilter のパターン文字列に指定されています。正規表現のマッチングですので、HBase API では RegexStringComparator が使われているはずです。

では、次のような LIKE の後方一致ではどうでしょうか。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key LIKE '%xxx';
startRow=, stopRow=,
filter=RowFilter (EQUAL, ^.*\x5CQxxx\x5CE$)

後方一致では行キーの範囲を絞り込めず、フルスキャンになることがわかります。

これまでの例では、暗黙的に行キーやカラムの値が UTF-8 文字列であることを想定してクエリを記述していましたが、HBase に格納した値が数値型やタイムスタンプ型だった場合は、クエリの書き方に少し注意が必要です。もし行キーの値がビッグエンディアンで格納されていれば、数値や日付の大きさ順でソートされることになるため、範囲フィルタの Pushdown を有効に利用できる可能性があります。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'UINT8_BE')
BETWEEN CAST(100 AS BIGINT) AND CAST(200 AS BIGINT);
startRow=\x00\x00\x00\x00\x00\x00\x00d,
stopRow=\x00\x00\x00\x00\x00\x00\x00\xC9,
filter=null

上の例では WHERE 句の中で、まず row_key に対し BYTE_SUBSTR 関数で格納されている数値のバイト長(ここでは8バイト)分を先頭から取り出しています。さらにそれを CONVERT_FROM 関数で符号なしのビッグエンディアン(ここでは UNIT8_BE)のエンコーディングで数値型に変換しています。そして同じバイト長の数値型(ここでは BIGINT 型)にキャストした数値と比較します。これらの条件をすべて満たすことにより、範囲スキャンの Pushdown が有効になります。

最後に、Drill は [HBASE-8201] OrderedBytes: an ordered encoding strategy で HBase に追加された、OrderedBytes と呼ばれるエンコーディング手法に対応しています。このエンコーディングでは、データがバイト列として格納されるときにそのデータ型のソート順を保持することができるため、Pushdown に活用することが可能です。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE CONVERT_FROM(row_key, 'INT_OB')
BETWEEN CAST(-32 AS INT) AND CAST(59 AS INT);
startRow=+\x7F\xFF\xFF\xE0, stopRow=+\x80\x00\x00;\x00,
filter=FilterList AND (2/2): [
  RowFilter (GREATER_OR_EQUAL, +\x7F\xFF\xFF\xE0),
  RowFilter (LESS_OR_EQUAL, +\x80\x00\x00;)
]

HBase にデータが OrderedBytes で格納されている場合、上のように数値を「INT_OB」エンコーディングで変換して比較することで、範囲スキャンが有効になっていることがわかります。

以上、様々な Pushdown による最適化を紹介しましたが、将来的にはこれ以上のテクニックが駆使されてさらに最適化が進むことが予想されます。例えば、HBase の行キーが複合キー(Intelligent Key)である場合の扱いにはまだ改良の余地があります。今後の性能向上を期待しましょう。

HBase データソースに対する Pushdown (1)

この記事は Apache Drill Advent Calendar 2015 の15日目の記事です。

HBase は Hadoop 上で動作する、「ワイドカラム型」NoSQL データベースです。RDBMS 風のテーブル構造を持ちますが、固定のスキーマを持つわけではないのでデータ構造の変更には柔軟である一方、HBase 自体にはシンプルなアクセス API があるのみで SQL クエリ機能はありません。Drill は HBase をデータストアとして使用することができるため、HBase に格納されたデータを使い慣れた SQL で柔軟に取り扱えるようになるのがメリットです。

ところが、Drill のようにデータストアと実行エンジンが分離しているアーキテクチャで、最適化をまったく考えずに実装してしまうと性能が全然出ません。最も単純な実装として、HBase でテーブルをフルスキャンして Drill 側にデータを渡し、Drill が SQL クエリの実行に必要なフィルタリングや集計を行う、という役割の分離が考えられますが、HBase のフルスキャンかなり遅いです。まあ、これは HBase に限らず Drill が使用するどのようなデータストア(MongoDB や RDBMS とか)にも当てはまります。

そこで、このような状況では一般的に Pushdown という最適化が行われます。HBase の例で言うと、まず HBase では「テーブルの各行にはユニークな行キーが存在しており、ソートされインデックスがついた状態で格納されている」という前提があるため、クエリの述語(WHERE 句の条件)によってあらかじめ行キーのスキャン範囲を絞り込めれば、I/O アクセスが大幅に減り性能向上につながります。他にも、カラムの値の比較が必要な場合に、HBase 内部の実装で比較を行ってから結果を Drill 側に渡してやったほうがより効率的で高速です。このように、より低いレイヤーの実装に処理を移譲する最適化が Pushdown です。

最近 BigQueryで150万円溶かした人の顔というエントリで、「WHERE句には何を書いてもテーブルをフルスキャンしてしまう」というのが話題になっておりましたが、BigQuery に述語の Pushdown の実装がなされていればこんなことにはならなかったはずです。Drill は BigQuery のベースになっている Dremel のオープンソース実装なので、Drill にできて BigQuery にできないことはないと思うのですが、謎ですね。

さて、実際にどのようなクエリがどんな風に Pushdown されるのかを見てみましょう。EXPLAIN PLAN FOR を SELECT の前につけることでクエリの実行プランが表示されるので、違いを確認することができます。

まず HBase テーブル全体を SELECT するクエリ。

$ apache-drill-1.3.0/bin/drill-embedded
0: jdbc:drill:zk=local> EXPLAIN PLAN FOR
. . . . . . . . . . . > SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`;
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=null, stopRow=null, filter=null], columns=[`row_key`, `cf`.`c1`]]])
...

そして WHERE 句で行キーに条件をつけた場合のクエリ。

0: jdbc:drill:zk=local> EXPLAIN PLAN FOR
. . . . . . . . . . . > SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`
. . . . . . . . . . . > WHERE row_key = 'xxx';
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=xxx, stopRow=xxx\x00, filter=null], columns=[`row_key`, `cf`.`c1`]]])
...

実行プランの Scan オペレーターのパラメーターの中に HBaseScanSpec という指定があり、さらに startRow, stopRow という指定もついていることがわかります。そして、WHERE 句をつけると startRow, stopRow に値が入ることも確認できます。

実際には、HBase の API で Scan オブジェクトが作られて、スキャンの開始行キー startRow、終了行キー stopRow がパラメータとして渡され、HBase の範囲スキャン操作が行われます。WHERE 句に行キーの条件をつけることで、大幅に処理が効率化されることがわかります。

別の例も見てみましょう。

0: jdbc:drill:zk=local> SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`
. . . . . . . . . . . > WHERE cf.c1 = 'xxx';
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=null, stopRow=null, filter=SingleColumnValueFilter (cf, c1, EQUAL, xxx)], columns=[`row_key`, `cf`.`c1`]]])
...

今度は行キーの条件はつけていないため範囲スキャンは行われませんが、カラムの値の比較条件がついていることで、filter というパラメータに SingleColumnValueFilter の指定が出てきます。これは HBase のスキャン時に、そのまま SingleColumnValueFilter のパラメータとして HBase 内部のフィルタリング処理に使われます。

以上のように、Pushdown によるクエリ実行計画の最適化の様子がおわかりいただけたかと思いますが、クエリの書き方によっては Pushdown が期待通りに働かないケースもあります。そのあたりの注意点は次の記事に続きます。

テキストファイルとNULLの扱い

この記事は Apache Drill Advent Calendar 2015 の14日目の記事です。

CSV 形式などのテキストファイルでどのように NULL を表現するかは、CSV を出力する RDBMS やアプリケーション毎に異なっているので結構悩みのタネですね。

  • Oracle(SPOOL を使用): 引用符のない空文字
  • SQL Server: 引用符のない空文字
  • DB2: 引用符のない空文字
  • MySQL: \N
  • PostgreSQL: \N(テキスト)、引用符のない空文字(CSV

デフォルトでは上記のような感じだと思いますが、これってオプションで形式を変えることもできますし、この他にも色々なアプリケーションがあるので、その都度表現形式を確認して処理をする必要があります。

さて、Drill では CSV 形式などのテキストファイルをデータソースとする場合は、すべて可変長文字列型として扱われます。なので、次のようなファイルがあったとしたら、

$ cat test.csv
"Japan","14","0"
"United States","23","3"
"China",\N,"5"
"France","16",

Drill で SELECT した結果は次のようになります。\N も引用符のない空文字も、NULL ではなく空文字列として扱われているのがわかります。

$ apache-drill-1.3.0/bin/drill-embedded
0: jdbc:drill:zk=local> SELECT columns[0], columns[1], columns[2] FROM dfs.`/tmp/test.csv`;
+----------------+---------+---------+
|     EXPR$0     | EXPR$1  | EXPR$2  |
+----------------+---------+---------+
| Japan          | 14      | 0       |
| United States  | 23      | 3       |
| China          | \N      | 5       |
| France         | 16      |         |
+----------------+---------+---------+
4 rows selected (0.093 seconds)

では、例えば引用符のない空文字を NULL として扱いたい場合はどうすればよいでしょうか。

方法1: CASE を使う

CASE を使用して、空文字であれば NULL に置き換えます。この方法は NULL を表現するのに他の形式が使われたときにも有効です。あとは、カラム毎に扱いを変えたいときにも柔軟に対応できますね。

0: jdbc:drill:zk=local> SELECT
. . . . . . . . . . . >   columns[0],
. . . . . . . . . . . >   CASE WHEN columns[2] = '' THEN NULL ELSE columns[1] END,
. . . . . . . . . . . > FROM dfs.`/tmp/test.csv`;
+----------------+---------+
|     EXPR$0     | EXPR$1  |
+----------------+---------+
| Japan          | 0       |
| United States  | 3       |
| China          | 5       |
| France         | null    |
+----------------+---------+
4 rows selected (0.116 seconds)

方法2: drill.exec.functions.cast_empty_string_to_null プロパティを使う

drill.exec.functions.cast_empty_string_to_null プロパティを true に設定すると、空文字列が「キャストをする時に」NULL として扱われるようになります。キャストをしない場合には、そのまま空文字列として扱われます。

0: jdbc:drill:zk=local> ALTER SYSTEM SET `drill.exec.functions.cast_empty_string_to_null` = true;
+-------+----------------------------------------------------------+
|  ok   |                         summary                          |
+-------+----------------------------------------------------------+
| true  | drill.exec.functions.cast_empty_string_to_null updated.  |
+-------+----------------------------------------------------------+
1 row selected (0.08 seconds)
0: jdbc:drill:zk=local> SELECT columns[0], CAST(columns[2] AS INT) FROM dfs.`/tmp/test.csv`;
+----------------+---------+
|     EXPR$0     | EXPR$1  |
+----------------+---------+
| Japan          | 0       |
| United States  | 3       |
| China          | 5       |
| France         | null    |
+----------------+---------+
4 rows selected (0.139 seconds)

Drillbit が使用するメモリサイズ

この記事は Apache Drill Advent Calendar 2015 の12日目の記事です。

Drill クラスタを構築する場合、各ノードで Drillbit という Java プロセスを立ち上げます。Drillbit は、ノードに常駐するデーモンプロセスとしてクラスタ全体で協調して動作することで、クエリを効率的に並列処理します(ちなみに Embedded モードでも、Drillbit が1つだけローカルで起動します)。

Drillbit は Java プロセスなので当然ながら Java ヒープを確保して使用しますが、Java ヒープとは別にダイレクトメモリを確保して Drillbit が直接その内部を管理しています。クエリの処理対象のデータはダイレクトメモリ上に展開して保持され、処理はできる限りオンメモリで行うことでパフォーマンスの最大化が図られます。

MapReduce などとは異なり、クエリ処理の中間でメモリ上のデータをディスクにすべて書き出すということはありませんが、もしダイレクトメモリ上にすべてのデータが載り切らない場合には、一部のデータをディスク上に一時的に退避させることがあり、その際にはディスクアクセスが発生します。このため、ダイレクトメモリのサイズはクエリの実行パフォーマンスを大きく左右する要素です。

Drillbit が使用するメモリ設定は、Drill をインストールしたディレクトリの conf/drill-env.sh に書かれています。

$ cat apache-drill-1.3.0/conf/drill-env.sh
...
DRILL_MAX_DIRECT_MEMORY="8G"
DRILL_MAX_HEAP="4G"
...

DRILL_MAX_HEAP は Java の起動オプション -Xmx に渡される Java ヒープの最大サイズで、デフォルトで 4GB に設定されています。処理対象のデータをヒープに置くことはないため、ほとんどのケースでは変更する必要はないでしょう。

一方、DRILL_MAX_DIRECT_MEMORY はダイレクトメモリの最大値です(デフォルト 8GB)。ノード上の利用可能なメモリをできるだけ多く割り当てることで、パフォーマンスを最大化することができます。

ただし、クラスタノード上で他のプロセスが動作している場合には、設定値に注意してください。例えば Hadoop クラスタでは、HDFS や HBase などのデータストアや、YARN フレームワークを動作させているのが一般的です。Drill 1.3 の時点では、まだ Drill は YARN に対応していないため、Drill のメモリの割り当てと YARN のメモリ割り当ては、合計使用量が物理メモリ容量を超えないように別々に固定で設定しておく必要があります。例えば、64GB メモリのマシンがあった場合、YARN で 32GB、HDFS で 16GB を確保した場合は、Drill で利用可能なメモリを 12GB 程度に抑えておく必要があります。

クエリ毎のリソース制御について

この記事は Apache Drill Advent Calendar 2015 の11日目の記事です。

Drill クラスタを構築して、ある程度規模の大きい SQL クエリ基盤を運用する場合、普通は複数のユーザーや複数のアプリケーションで Drill クラスタを共有する使い方をすると思います。その時に課題となるのは、個々のクエリが使うリソースをどのように制御できるかというところです。本記事では、Drill が備えるリソース制御のオプションについて見てみましょう。

まず同時に実行可能なクエリの上限について。デフォルトでは同時に実行可能なクエリの数は無制限ですが、むやみに同時実行数を増やしていくとリソースの競合が激しくなり、全体的なパフォーマンスが低下します。これを防ぐため、Drill ではクエリの「キュー」を用意し、設定した上限までのクエリをキューから取り出して同時実行するしくみがあります。

Enabling Query Queuing
https://drill.apache.org/docs/enabling-query-queuing/

exec.queue.enable プロパティを true にすると、キューが有効になります。Drill では、「large」キューと「small」キューの2つのキューが用意され、それぞれ個別に同時実行数を設定することが可能です。投入したクエリがどちらのキューに入るかは、推定される処理件数のしきい値 exec.queue.threshold プロパティによって決まります。

プロパティ説明デフォルト値
exec.queue.enable キューを有効にするかどうか false
exec.queue.large large キューに入ったクエリの同時実行数 10
exec.queue.small small キューに入ったクエリの同時実行数 100
exec.queue.threshold クエリの推定される処理件数がこの値より大きければ large キュー、小さければ small キューに投入される 30000000
exec.queue.timeout_millis クエリがキューに入ってからこの時間が経過すると失敗する(ミリ秒) 300000

また、個々のクエリ実行の並列度についても調整可能なオプションがいくつかあります。

Configuring Resources for a Shared Drillbit: Configuring Parallelization
https://drill.apache.org/docs/configuring-resources-for-a-shared-drillbit/#configuring-parallelization

プロパティ planner.width.max_per_node はクエリあたりの1ノード(クラスタを構成するマシン1台)で処理できる並列度の最大値です。例えばフィルタリングの処理は、マルチコアマシンでは複数スレッドを使って並列に実行することで効率的に処理を行えます。この設定値はクエリあたりの上限ですが、これとは別に、すべてのクエリの合計の1ノードあたりの並列度は「ノード上のアクティブな Drillbit の数 * ノードのコア数 (Hyper-threadを考慮) * 0.7」を超えることはありません。

プロパティ planner.width.max_per_query はクエリあたりの処理の並列度の最大値です。これはクラスタ全体での並列度になります。実際には、(planner.width.max_per_node * ノード数) と planner.width.max_per_query のどちらか小さい値が、クエリあたりの最大並列度になります。

プロパティ説明デフォルト値
planner.width.max_per_node クエリあたりの1ノードで処理できる並列度の最大値 3
planner.width.max_per_query クエリあたりの処理の並列度の最大値 1000

さらに、planner.memory.max_query_memory_per_node プロパティでノード単位で各クエリが利用できるメモリの最大値を指定することが可能です。

Configuring Drill Memory
https://drill.apache.org/docs/configuring-drill-memory/

Drill はクエリプランの実行に必要なメモリが足りないことが推定される場合、少ないメモリでも実行可能なプランに作り直す場合があります。また、ウィンドウ関数のような外部ソートのための大量のメモリを必要とするクエリでエラーが起きるような場合、この設定値を大きくすることでエラーを回避することができる可能性があります。

プロパティ説明デフォルト値
planner.memory.max_query_memory_per_node ノード単位で各クエリが利用できるメモリの最大値(Byte) 2147483648

上記のプロパティはシステム全体でもセッション単位でも指定できます。例えばシステム全体で指定する場合は

0: jdbc:drill:zk=local> ALTER SYSTEM SET `exec.queue.enable` = true;

セッション単位で指定する場合は

0: jdbc:drill:zk=local> ALTER SESSION SET `exec.queue.enable` = true;

を実行します。