読者です 読者をやめる 読者になる 読者になる

HDFS Snapshot + distcp と MapR-FS Volume Mirroring の違い

MapR は HDFS の代わりに MapR-FS を使用している Hadoop ディストリビューションです。性能の向上信頼性の向上ランダムリードライト可能なNFSNoSQL データベースとの統合メッセージングキューとの統合、・・・と MapR-FS のメリットは挙げればきりがないのですが、HDFS API はそのまま利用できるため、すべての Hadoop アプリケーションやライブラリは違いを意識することなく動作します。

さて、Hadoop クラスタを運用する際に、データ更新を行う業務アプリケーションと、参照がメインの分析アプリケーション間で同じデータを共有したい、というケースはよくあると思います。ただし、分析アプリはデータセットの特定の時点の一貫性のあるスナップショットに対して処理を行うべきであるため、任意の時点で更新が発生する業務アプリのデータセットにそのままアクセスするわけにはいきません。また、分析アプリが業務アプリと同時に動作した場合、負荷の増大により業務アプリの処理時間に影響を与えてしまうリスクもあります。

このような場合に、 Hadoop では特定のディレクトリのスナップショットを取得する HDFS Snapshot 機能と、分散データコピーツール distcp を使い、物理的なコピーをクラスタ内、もしくは別クラスタに作成し、特定の時点のスナップショットの複製に対してアクセスを行うことで、上記の課題を解決する方法があります。

しかし MapR にはこれと同じことをより簡単に、より効率良く行なうための Volume Mirroring 機能があります。しかもこの機能は MapR の 2011 年の最初のリリースから存在しており、安定して運用に利用されてきた実績があります。

以下では、それぞれの動作の違いについて比較してみましょう。

HDFS Snapshot + distcp の場合

手順の詳細は上記の記事を見ていただければと思いますが、同じ例を使って説明します。まず、source ディレクトリに Data.txt が存在します。

f:id:nagixx:20160410152218j:plain

hdfs コマンドで source ディレクトリのスナップショットを作ります。 

f:id:nagixx:20160410152407j:plain

distcp ツールでスナップショット s1 を target ディレクトリにコピーします。このコマンドは MapReduce ジョブを起動し、クラスタ内で並列にデータコピーが行われます。

f:id:nagixx:20160410152516j:plain

hdfs コマンドで target ディレクトリのスナップショットを作ります。これで 1 世代目のスナップショットの複製が全体コピーで作成されました。

f:id:nagixx:20160410152846j:plain

次に source ディレクトリに Data2.txt が更新データとして書き込まれました。

f:id:nagixx:20160410153010j:plain

hdfs コマンドで source ディレクトリの 2 世代目のスナップショットを作ります。

f:id:nagixx:20160410153317j:plain

distcp ツールでコピーを行いますが、今度はスナップショット s1 と s2 の内容を比較し、ファイル間で更新があったものだけの差分コピーが行われます。

f:id:nagixx:20160410153449j:plain

hdfs コマンドで target ディレクトリのスナップショットを作ります。これで 2 世代目のスナップショットの複製が差分コピーで作成されました。3 世代目以降はこれの繰り返しです。

f:id:nagixx:20160410153802j:plain

MapR-FS Volume Mirroring の場合

上記のようなデータの差分更新のサイクルを回す場合、MapR-FS では Volume という論理的な管理単位を作成しておき、ファイルシステム上に配置しておくことで運用をシンプルにできます。

Volume Mirroring 機能では、コピーの単位は Volume 全体ですので、source 側、mirror 側それぞれにあらかじめ Volume を作成しておきます。Volume を作成するには次の管理コマンドを実行します。mirror 側には <source ボリューム名>@<source クラスタ名> の形でコピー元となる Volume を指定します。

$ maprcli volume create -name source -path /user/hadoop/source
$ maprcli volume create -name target -path /user/hadoop/target -source source@demo.mapr.com -type mirror

先ほどの例と同様に、はじめに source ディレクトリに Data.txt が存在します。

f:id:nagixx:20160410154632j:plain

さて、スナップショットを取りたいタイミングで、MapR の場合は次のコマンド 1 つを実行するだけで、Mirror Volume にスナップショットのコピーが作られるところまで完了します。シンプルですね。

$ maprcli volume mirror start -name target

内部では、まず source 側では一時的なスナップショット、target 側ではタイムスタンプのついたスナップショットが同時に作成されます。その後、source の一時的なスナップショットの内容が target ボリュームにコピーされます。ここでのポイントは、コピーはバックグラウンドで非同期に行われること、コピー処理は MapReduce を起動せず、MapR-FS プロセスが行うこと、target 側ではコピー途中のファイルは見えず、完了した時点で初めてアクセスできるようになることです。コピー完了時に、source 側の一時的なスナップショットは削除されます。

f:id:nagixx:20160410160242j:plain

次に source ディレクトリに Data2.txt が更新データとして書き込まれました。

f:id:nagixx:20160410172243j:plain

2 世代目のスナップショットを作成する場合も、1 世代目とまったく同じコマンドを実行します。シンプルですね。

$ maprcli volume mirror start -name target

両方の Volume 内にスナップショットが作られるところまでは同じですが、今度は前回のコピー時から差分のあったデータだけをコピーします。HDFS ではファイル単位でしか差分を比較していませんが、MapR-FS では 8KB のデータブロック単位で更新履歴を管理しているため、実際に変更のあったファイルの一部分のみのコピーが行われます。

第 3 世代以降もこれの繰り返しです。

f:id:nagixx:20160410161843j:plain

さらに、この Mirroring のコピーのタイミングはスケジューリング設定によって完全に自動化できるため(target 側に残るスナップショットの有効期限も設定できる)、マニュアル操作不要の運用が行えます。

MapR-FS だと何がいいのか

まとめると、MapR-FS Volume Mirroring では下記のようなメリットがあり、運用上の複数の課題の解決に役立ちます。

  • 運用に必要な手順が少ないため、運用設計及び実行の負担が減る
    • オペレーションミスのリスクも減らせる
  • HDFS + MapReduce という 2 つのレイヤにまたがる処理ではなく、MapR-FS で完結する処理であるためシンプルで効率的
    • コピーで MapReduce ジョブが起動しないため、クラスタのリソースの消費を最低限に押さえられる
    • 他の業務のアプリケーションへの影響が少ない
    • 障害時の内部的な挙動もシンプルになりリスクを減らせる
  • MapR-FS の粒度の細かいデータブロック管理を最大限に活用し、短時間でコピーを完了
    • 8KBデータブロック単位の差分管理
    • MapR-FS の圧縮、チェックサムが適用された状態で信頼性が高く効率の良いコピーが可能
  • アトミックな操作で一貫性のあるスナップショットを利用可能
    • HDFS のスナップショットは NameNode 上のメタデータを利用したイメージの取得であるため、構造上データそのものとの完全な一貫性が保証できない。一方、MapR-FS ではデータとメタデータが同期された一貫性のあるスナップショットを取得できる
    • MapR-FS Volume Mirroring のコピーはアトミックな動作であるため、バックグラウンドでコピー処理実行中でも、source 側、target 側双方で Point-in-Time の一貫性のあるデータセットへのアクセスができる

2016 年に向けた注目の新機能の開発状況

この記事は Apache Drill Advent Calendar 2015 の25日目の記事です。

2015年もあとわずか。今回は Drill の JIRA チケットや GitHub を眺めつつ、2016年にどんな新機能が出てきそうか興味のおもむくままにご紹介しましょう。

Cassandra ストレージプラグイン

[DRILL-92] Cassandra storage engine

初期パッチが出て議論が行われたものの、ここしばらく動きが中断しちゃっていますね。

How to Use Apache Drill with Cassandra - Stack Overflow

こちらの議論を見ると、Drill や Cassandra の開発が進んだ結果、当初のパッチがもう使えなくなってしまっているようです。誰かー。

Couchbase ストレージプラグイン

jacques-n/drill-couchbase-plugin · GitHub
drill/contrib/storage-couchbase at couchbase-storage-plugin · tgrall/drill · GitHub

2つの作りかけの実装があります。下の方は Couchbase の N1QL を利用しているようです。

Solr/Elasticsearch ストレージプラグイン

[DRILL-3585] Apache Solr as a storage plugin
[DRILL-3637] Elasticsearch storage plugin

Solr/Elasticsearch インデックスも構造はドキュメントデータベースみたいなものですからね。SQL でクエリを投げられれば便利です。

Kudu ストレージプラグイン

[DRILL-4241] Add Experimental Kudu plugin

高速な分析に最適化したストレージ Apache Kudu に対応するストレージプラグインです。

XML フォーマットのサポート

[DRILL-3878] Support XML Querying (selects/projections, no writing)

JSON にクエリができるなら XML もできるだろうということで。JSONRecordReader をフックしている感じでしょうか。

HTTPD ログフォーマットのサポート

[DRILL-3423] Add New HTTPD format plugin

HTTPD ログを変換なしに直接 SQL で処理できるというプラグインです。これもあると便利ですね。

Excel フォーマットのサポート

[DRILL-3738] Create StoragePlugin for Excel files (.xlsx or possibly .xls) - version 1 - read only.

Apache POI を使って Excel のファイルに対応させようという試みです。企業にはたくさんありますからねえ。

INSERT INTO TABLE サポート

[DRILL-3534] Insert into table support

前から課題にはあがっていると思うのですが、実装が進んでいるのかどうかはイマイチ不明です。

分析関数の追加

[DRILL-3962] Add support of ROLLUP, CUBE, GROUPING SETS, GROUPING, GROUPING_ID, GROUP_ID support

BI 分析ワークロードに欠かせない分析関数の追加です。Drill が内部で使っている Apache Calcite にはすでに実装があるので、Drill でサポートするのはそんなに困難ではないはずです。

地理空間データのクエリ

[DRILL-3914] Support geospatial queries

PostgreSQL 向け PostGIS のような感じの、地理空間データのクエリをサポート。内部で ESRI Geometry API for Java を使うのは定番のようです。すでに 1.3 で基本的な機能は master に取り込まれているっぽいです。

JDK 8 サポート

[DRILL-1491] Support for JDK 8

結構よく聞かれる JDK 8 のサポート。まだいくつかのサブタスクが残っているようです。JDK 7 のサポート切れからしばし経過する一方 JDK 8 の導入が進んでいるので、早い対応が望まれますね。

YARN サポート

[DRILL-1170] YARN support for Drill

負荷に合わせて動的に YARN コンテナを増減するというしくみを目指す取り組み。だけどもこれもまだ動きが無いような。

Drill のユーザ認証とインパーソネーション

この記事は Apache Drill Advent Calendar 2015 の24日目の記事です。

今回は、Drill のセキュリティを向上させる2つの機能、インパーソネーションとユーザ認証について紹介します。

インパーソネーション

Drill には、クライアントから要求されたアクションを、クライアント自身の権限で実行する「インパーソネーション(Impersonation)」機能があります。デフォルトでは無効になっていますが、設定ファイル conf/drill-override.conf を編集して有効にすることができます。

インパーソネーションが有効になっている場合、Drill はユーザーの認証情報(Credential)をファイルシステムに渡し、ファイルシステムはそのユーザがデータアクセスに対する適切な権限を持っているかのチェックを行います。逆にインパーソネーションが無効になっている場合、Drill はすべてのクライアントからの要求を、Drillbit サービスを起動したユーザの権限で実行することになります。Drillbit を起動するユーザは通常は特権ユーザですが、この場合クライアントのユーザごとの細かいアクセス制御ができないことになります。

インパーソネーションを有効にするには、次のように設定ファイルを編集し、Drillbit を再起動します。

$ vi conf/drill-override.conf
drill.exec: {
  cluster-id: "cluster_name",
  zk.connect: "<hostname>:<port>,<hostname>:<port>,<hostname>:<port>",
  impersonation: {
    enabled: true
  }
}

下記の表は、Drill のどの機能がインパーソネーションに対応しているかを示しています。

タイプ対応機能非対応の機能
クライアント SQLLine, ODBC, JDBC Drill Web コンソール, REST API
ストレージプラグイン ファイルシステム Hive, HBase
クエリ

インパーソネーションが有効のとき、設定はデータとメタデータ両方のアクセスに適用されます。例えば、SHOW SCHEMAS コマンドを発行した場合、Drill はログインしたユーザの権限でメタデータにアクセスを試みます。また、ワークスペースに対する SELECT クエリを発行した場合は、ログインしたユーザの権限でデータにアクセスを試みます。インパーソネーションが適用されるコマンドは次の通りです:

  • SHOW SCHEMAS
  • SHOW DATABASES
  • SHOW TABLES
  • CREATE TABLE AS SELECT
  • SELECT
  • CREATE VIEW
  • DROP VIEW
  • SHOW FILES

CREATE TABLE AS SELECT および CREATE VIEW コマンドを実行するには、ユーザはテーブルやビューを保存するディレクトリの書き込み権限を持っている必要があることに注意してください。

 

ユーザ認証

一方で、ユーザを正しく識別するためにはユーザ認証が欠かせません。Drill では Linux PAM(Pluggable Authentication Module)によるユーザ名/パスワードをベースにした認証をサポートしています。認証は JDBC/ODBC 接続を介して行われます。PAM はインストールされたどの PAM 認証エンティティにも対応するため、ローカル OS のパスワードファイルはもとより LDAP などにも幅広く対応します。

もしインパーソネーションが有効になっていれば、ユーザ認証と併用することで、どのユーザがどのファイルにアクセスできるかということを制御できることになり、Drill システム全体のセキュリティが向上します。

PAM を利用したユーザ認証を行うには、すべての Drill ノードでユーザのユーザ名、UID、パスワードが一致している必要があります。また、/etc/passwd を使う場合には、Drillbit を起動するユーザはすべてのノードで shadow グループに所属している必要があります。

それでは、PAM 認証の設定の手順を追っていきます。まず下記のサイトから Linux 用の JPam をダウンロードします。

http://sourceforge.net/projects/jpam/files/jpam/jpam-1.1/

tar.gz を展開し、libjpam.so を取り出してどこかのディレクトリに配置します(例えば /opt/pam)。そして conf/drill-env.sh を編集して libjpam.so があるディレクトリのパスを次のように指定します。

$ vi conf/drill-env.sh
export DRILLBIT_JAVA_OPTS="-Djava.library.path=/opt/pam/"

さらに conf/drill-override.conf に次の設定を加えます。

$ vi conf/drill-override.conf
drill.exec: {
  security.user.auth: {
    enabled: true,
    packages += "org.apache.drill.exec.rpc.user.security",
    impl: "pam",
    pam_profiles: [ "sudo", "login" ]
  } 
}

これで Drillbit を再起動すれば、ユーザ認証が有効になります。

sqlline を使うクライアントからアクセスするには、次のようにユーザ名(-n オプション)とパスワード(-p オプション)を指定します。

$ sqlline –u jdbc:drill:zk=10.10.11.112:5181 –n bob –p bobdrill

もしパスワードを隠したい場合には、sqlline に入った後に !connect コマンドで接続します。

$ sqlline
sqlline> !connect jdbc:drill:zk=localhost:2181
scan complete in 1385ms
Enter username for jdbc:drill:zk=localhost:2181: bob
Enter password for jdbc:drill:zk=localhost:2181: *************

最後に、Drill の特権ユーザについて説明を加えておきます。ユーザ認証を有効にした場合、Drill の特権ユーザのみに次のような操作が許されることになります。

  • ALTER SYSTEM コマンドによるシステムレベルのオプションの変更
  • REST API もしくは Web UI によるストレージプラグイン設定の変更
  • すべてのクエリプロファイルの閲覧
  • すべての実行中のクエリのキャンセル

特権ユーザは次のうちのいずれかです。

  • システムオプション security.admin.users で指定されたユーザ(ALTER SYSTEM にて変更可能)
  • システムオプション security.admin.user_groups で指定されたグループに所属するユーザ(ALTER SYSTEM にて変更可能)
  • Drillbit を起動したユーザ

Parquet のパーティショニングによる性能最適化

この記事は Apache Drill Advent Calendar 2015 の23日目の記事です。

Drill では Parquet フォーマットを使うことによって、パーティションプルーニングによる性能上のメリットを得ることができます。パーティションプルーニングとは、アクセスするパーティションを WHERE 句の条件により自動的に絞り込む機能で、これにより処理の効率化が期待されます。

パーティションプルーニング機能を使うには、まずテーブルをパーティションに分けておく必要があります。PARTITION BY 句をつけて CTAS(CREATE TABLE AS SELECT)操作を行うことでパーティション化したテーブルを作成します。

Parquet の書き込み処理では、まずパーティションキーによるソートが行われ、パーティションカラムに新しい値を見つける毎にファイルが作られます。Drill は同じディレクトリ内にパーティション毎のファイルを作りますが、1つのファイルには1つのパーティションキーの値しか含みません。ただし、1つのパーティションキーに対してファイルが複数になることはあります。

では、例を見てみましょう。元データは最近よく使う個人情報 CSV ファイルです。

$ head -n 3 personal_information_50m.csv 
1,碓井卓也,ウスイタクヤ,0,0984963105,takuya57105@rrqec.sq,889-0507,宮崎県,延岡市,旭ケ丘,2-17,,ミヤザキケン,ノベオカシ,アサヒガオカ,2-17,,1985/08/13,29,東京都,AB,836,"FBQv,9nv"
2,脇田勇三,ワキタユウゾウ,0,0554838431,yuuzou679@ydomu.ee,400-0121,山梨県,甲斐市,牛句,4-2-12,牛句ロイヤルパレス213,ヤマナシケン,カイシ,ウシク,4-2-12,ウシクロイヤルパレス213,1978/04/28,36,沖縄県,A,862,kp1fDUPQ
3,立川円香,タチカワマドカ,1,0221247688,madoka_tachikawa@uzppymocdm.za,980-0855,宮城県,仙台市青葉区,川内澱橋通,4-1-13,川内澱橋通コーポ303,ミヤギケン,センダイシアオバク,カワウチヨドミバシドオリ,4-1-13,カワウチヨドミバシドオリコーポ303,1990/01/29,24,鹿児島県,A,881,XtFKa9kL

これを CTAS で Parquet に変換します。この例では、都道府県を格納するカラム address1 をパーティションキーに指定しています。

$ apache-drill-1.4.0/bin/sqlline -u jdbc:drill:zk=node1:5181
0: jdbc:drill:zk=node1:5181> CREATE TABLE dfs.tmp.`/personal_information/`
. . . . . . . . . . . . . .> PARTITION BY (address1)
. . . . . . . . . . . . . .> AS SELECT
. . . . . . . . . . . . . .>   CAST(columns[0] AS INT) AS id,
. . . . . . . . . . . . . .>   columns[1] AS name,
. . . . . . . . . . . . . .>   CAST(columns[3] AS INT) AS sex,
. . . . . . . . . . . . . .>   columns[4] AS phone,
. . . . . . . . . . . . . .>   columns[5] AS email,
. . . . . . . . . . . . . .>   columns[6] AS zip,
. . . . . . . . . . . . . .>   columns[7] AS address1,
. . . . . . . . . . . . . .>   columns[8] AS address2,
. . . . . . . . . . . . . .>   columns[9] AS address3,
. . . . . . . . . . . . . .>   columns[10] AS address4,
. . . . . . . . . . . . . .>   columns[11] AS address5,
. . . . . . . . . . . . . .>   TO_DATE(columns[17], 'yyyy/MM/dd') AS birth_date
. . . . . . . . . . . . . .> FROM dfs.`/personal_information_50m.csv`;
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 1_4       | 5200254                    |
| 1_8       | 5200257                    |
| 1_5       | 5200239                    |
| 1_2       | 5200257                    |
| 1_7       | 5200248                    |
| 1_1       | 5277845                    |
| 1_3       | 6240297                    |
| 1_0       | 6240310                    |
| 1_6       | 6240293                    |
+-----------+----------------------------+
9 rows selected (557.927 seconds)

実行結果のテーブルを見ると、9つの Minor Fragment が並列にデータを書き出したことが読み取れます。出力先のディレクトリには以下のようにファイルができています。ファイル名先頭の「1_x」は書き出し処理を行った Minor Fragment の番号、次の「_yy」はパーティションの番号です(ここでは都道府県でパーティショニングしたので48個に分かれていますね)。

$ ls tmp/personal_information/
1_0_1.parquet   1_1_45.parquet  1_3_37.parquet  1_5_29.parquet  1_7_20.parquet
1_0_10.parquet  1_1_46.parquet  1_3_38.parquet  1_5_3.parquet   1_7_21.parquet
1_0_11.parquet  1_1_47.parquet  1_3_39.parquet  1_5_30.parquet  1_7_22.parquet
1_0_12.parquet  1_1_48.parquet  1_3_4.parquet   1_5_31.parquet  1_7_23.parquet
1_0_13.parquet  1_1_5.parquet   1_3_40.parquet  1_5_32.parquet  1_7_24.parquet
1_0_14.parquet  1_1_6.parquet   1_3_41.parquet  1_5_33.parquet  1_7_25.parquet
...

元の CSV と Parquet のファイルサイズを比べると、Parquet ファイルの圧縮効果が一目瞭然です。Parquet おそるべし!

$ du -sm personal_information_50m.csv tmp/personal_information/
12308	personal_information_50m.csv
3258	tmp/personal_information/

では WHERE 句の条件つきのクエリを実行してみます。

0: jdbc:drill:zk=node1:5181> SELECT name, address1, address2, address3, address4, address5, birth_date
. . . . . . . . . . . . . .> FROM dfs.tmp.`/personal_information/`
. . . . . . . . . . . . . .> WHERE address1 = _UTF16'神奈川県' AND
. . . . . . . . . . . . . .>       address2 = _UTF16'横須賀市' AND
. . . . . . . . . . . . . .>       birth_date > '1985-01-01'
. . . . . . . . . . . . . .> LIMIT 5;
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
|  name  | address1  | address2  | address3  | address4  |   address5    | birth_date  |
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
| 高見善次郎  | 神奈川県      | 横須賀市      | 馬堀海岸      | 2-5-10    |               | 1987-06-13  |
| 福田渚    | 神奈川県      | 横須賀市      | 船越町       | 2-12-8    | 船越町ステーション105  | 1986-11-24  |
| 辻清作    | 神奈川県      | 横須賀市      | 平和台       | 2-11      | 平和台テラス202     | 1985-09-20  |
| 河田果凛   | 神奈川県      | 横須賀市      | 久村        | 2-18      | ザ久村305        | 1991-08-23  |
| 尾上謙治   | 神奈川県      | 横須賀市      | グリーンハイツ   | 1-17-16   |               | 1992-04-05  |
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
5 rows selected (31.381 seconds)

肝心なのは、実際にどんなプランが実行されたかです。EXPLAIN PLAN FOR もしくは Drill Web UI でクエリプランを見てみます。するとスキャン対象のファイルが「1_x_34.parquet」という名前の 9 つのみであることがわかります。「神奈川県」をキーとするデータは 34 番のパーティションに入っており、スキャン対象がきちんと絞り込まれていることを確認できました。

00-00    Screen
00-01      Project(name=[$0], address1=[$1], address2=[$2], address3=[$3], address4=[$4], address5=[$5], birth_date=[$6])
00-02        SelectionVectorRemover
00-03          Limit(fetch=[5])
00-04            UnionExchange
01-01              SelectionVectorRemover
01-02                Limit(fetch=[5])
01-03                  Project(name=[$3], address1=[$0], address2=[$1], address3=[$4], address4=[$5], address5=[$6], birth_date=[$2])
01-04                    SelectionVectorRemover
01-05                      Filter(condition=[AND(=($0, _UTF-16LE'神奈川県'), =($1, _UTF-16LE'横須賀市'), >($2, '1985-01-01'))])
01-06                        Project(address1=[$4], address2=[$6], birth_date=[$2], name=[$0], address3=[$5], address4=[$1], address5=[$3])
01-07                          Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tmp/personal_information/1_1_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_5_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_2_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_6_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_0_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_4_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_8_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_3_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_7_34.parquet]], selectionRoot=maprfs:/tmp/personal_information, numFiles=9, usedMetadataFile=false, columns=[`address1`, `address2`, `birth_date`, `name`, `address3`, `address4`, `address5`]]])

以前の記事でも解説したように、Parquet は Column-oriented フォーマットであるため、各ファイルのスキャンにおいても SELECT 対象のカラムの部分に絞り込んで読み込みを行っています。圧縮効果によるファイルサイズの縮小とも合わせ、大きなパフォーマンス向上の効果が見込めるでしょう。

Drill Web UI のビジュアルなクエリプロファイル

この記事は Apache Drill Advent Calendar 2015 の22日目の記事です。

Drill のパフォーマンスチューニングに役立つ情報の一つはクエリプラン、そしてもう一つはクエリプロファイルです。今回はクエリプロファイルでどんな情報が見られるかを紹介していきましょう。

今回使うサンプルデータはこんな感じです。まずは 5,000万人分の個人情報が入った CSV ファイル約13GB(ダミー情報です)。

$ head -n 3 personal_information_50m.csv 
1,碓井卓也,ウスイタクヤ,0,0984963105,takuya57105@rrqec.sq,889-0507,宮崎県,延岡市,旭ケ丘,2-17,,ミヤザキケン,ノベオカシ,アサヒガオカ,2-17,,1985/08/13,29,東京都,AB,836,"FBQv,9nv"
2,脇田勇三,ワキタユウゾウ,0,0554838431,yuuzou679@ydomu.ee,400-0121,山梨県,甲斐市,牛句,4-2-12,牛句ロイヤルパレス213,ヤマナシケン,カイシ,ウシク,4-2-12,ウシクロイヤルパレス213,1978/04/28,36,沖縄県,A,862,kp1fDUPQ
3,立川円香,タチカワマドカ,1,0221247688,madoka_tachikawa@uzppymocdm.za,980-0855,宮城県,仙台市青葉区,川内澱橋通,4-1-13,川内澱橋通コーポ303,ミヤギケン,センダイシアオバク,カワウチヨドミバシドオリ,4-1-13,カワウチヨドミバシドオリコーポ303,1990/01/29,24,鹿児島県,A,881,XtFKa9kL

そしてゆるキャラ情報が入った JSON ファイル約1KB。

$ cat yuru.json 
{
  "address1":"熊本県",
  "address2":"",
  "character":"くまモン",
}
{
  "address1":"千葉県",
  "address2":"船橋市",
  "character":"ふなっしー",
}
{
  "address1":"奈良県",
  "address2":"",
  "character":"せんとくん"
}
...

これを3ノードからなる Hadoop分散ファイルシステムに配置し、各ノードでは Drillbit を稼働しておきます。クエリは、ゆるキャラごとにその住所に住んでいる人数をカウントするというシンプルな処理です。

SELECT
  b.`character` AS `ゆるキャラ`,
  b.address1 as `都道府県`,
  b.address2 as `市区町村`,
  count(*) as `人数`
FROM
  dfs.`/personal_information_50m.csv` AS a
JOIN
  dfs.`/yuru.json` AS b
ON
  (b.address1 = a.columns[7]) AND
  (b.address2 = a.columns[8] OR b.address2 = '') AND
  b.`character` <> ''
GROUP BY b.`character`, b.address1, b.address2;

まず Drill の Web UI の「Query」タブでこの SQL を Submit します。

f:id:nagixx:20151222192355p:plain

クエリの実行が始まり、しばらくすると結果が表示されます。

f:id:nagixx:20151222192744p:plain

ここで「Profiles」タブをクリックすると、完了したクエリの一覧が表示されます。ただし、表示されるクエリはアクセスしている Web UI のノードが Foreman(クエリを受け付けたノード)となったクエリだけですので、それが必ずしもクエリを投入したノードになるとは限らないことにご注意を。対象のクエリが表示されなかったら、ほかの Drillbit ノードの UI も見てみましょう。

f:id:nagixx:20151222193619p:plain

ここで対象のクエリをクリックすると、そのクエリのプロファイル情報が表示されます。ここには大きく分けて次のような情報があります。

  • クエリとそのプラン(Query and Planning)
  • クエリのプロファイル(Query Profile)
  • Fragment のプロファイル(Fragment Profiles)
  • オペレータのプロファイル(Operator Profiles)
  • 完全な JSON 形式のプロファイル(Full JSON Profile)

f:id:nagixx:20151222193855p:plain

これらを見ていく前に、プランをビジュアルに表示する「Visualized Plan」タブの内容が便利なので、まず見てみます。ここではプランの非循環有向グラフ(DAG)が色分けされて表示されます。実際の処理は下から上に向けて流れていきます。

グラフの各ノードはスキャン、フィルタ、集約などの処理を表すオペレータで、Major Fragment ごとに異なる色が付けられています。よくみると、Major Fragment はクラスタノード間でデータの交換が行われる Exchange オペレータを境に分割されていることがわかります。

また、各ノードについている番号は、前回の記事でも出てきた <Major Fragment ID>-<Operator ID> を示しています。

f:id:nagixx:20151222195038p:plain

さて、Fragment Profiles の欄に目を移すと、各 Major Fragment の処理がどのように並列化され、各処理にどれだけ時間がかかったかがわかるタイムラインの表示があります。それぞれの色は Visualized Plan の Major Fragment の色に対応しており、並列度が大きいほど上下方向の幅が太く表示されることになります。

下の例では、水色の Major Fragment 04 の処理は一瞬で完了し、緑色の Major Fragment 03 の処理はある程度の並列度でしばらく進んだ後、若干のばらつきがありつつ完了していることがわかります。もしこの Major Fragment の表示が極端な段差を示している場合には、データの偏りやデータローカリティの問題が疑われます。

f:id:nagixx:20151223030602p:plain

さらにもう少し詳細に Major Fragment の内容を見ていくには、Fragment Profiles の「Major Fragment: <Major Fragment ID>-xx-xx」と書かれた帯をクリックすると、詳細が展開表示されます。

各行の先頭についている番号は <Major Fragment ID>-<Minor Fragment>-xx を表しています。Minor Fragment というのは、Major Fragment を並列化した一つ一つの処理のことです。各 Minor Fragment の実行ノードや処理時間、処理レコード数、ピークメモリ量などを見ることができます。

下の例では、Major Fragment 03 が 9 並列に分割され、3ノードに均等に割り当てられていることがわかります。

f:id:nagixx:20151223031816p:plain

ところで上記の Minor Fragment の情報では並列に実行される様子がわかりましたが、Minor Fragment 内に含まれている個々のオペレータがどれくらいのリソースを使って実行されたかまではわかりません。これを確認するためには Operator Profiles に注目します。

各行の先頭についている番号は <Major Fragment ID>-xx-<Operator ID> です。そして各オペレータのセットアップ時間、処理時間、待ち時間、ピークメモリ量などを見ることができます。ここではオペレータに焦点を置いているため Minor Fragment の区別はしていませんが、最小値、平均値、最大値は Minor Fragment 間で集約した結果の値です。

f:id:nagixx:20151223033352p:plain

もしさらに詳細な Minor Fragment ごとのオペレータのプロファイルが欲しい場合は、Operator Profiles の「<Major Fragment ID>-xx-<Operator ID> - <Operator Name>」が書かれた帯をクリックします。すると、<Major Fragment ID>-<Minor Fragment>-<Operator ID> のレベルまで細分化されたプロファイルを得ることができます。

f:id:nagixx:20151223034516p:plain

クエリ処理のどの部分でどれだけ時間やリソースを消費しているかを把握することは、パフォーマンスチューニングの基本中の基本ですので、まずはこの UI をマスターしましょう。

クエリプランの見かたとハッキングの方法

この記事は Apache Drill Advent Calendar 2015 の19日目の記事です。

SQL が動いて正しい結果が返ってくればよい、というのであれば必要ないのですが、パフォーマンスが気になり始めたらクエリプランとプロファイルを調べていく必要が出てきます。

SQL が発行されると、Drill はクラスタ構成やデータソースに関する統計情報をヒントに、これを論理プラン→物理プラン→実行プランに展開していきます。Drill のクエリの実行の様子については下記の記事を参考にしてください。

クエリの論理プランと物理プランは、EXPLAIN コマンドで確認することができます。ただし、実行プランについては見ることができませんので、確認したい場合には Drill の Web UI でクエリプロファイルを見る必要があります。

さて、まず論理プランは SQL のオペレータを Drill のパーサが論理オペレータに変換することで作られます。ここでのポイントは、論理プランには実際のクラスタ構成やデータに基づく並列化や最適化などが含まれないという点です。

下記は論理プランを表示した例です。EXPLAIN に WITHOUT IMPLEMENTATION をつけることで論理プランの表示になります。下から上の方向に、JSON ファイルのスキャン、フィルタリング、グループ集計、プロジェクションが順に行われることが分かります。

0: jdbc:drill:zk=local> EXPLAIN PLAN WITHOUT IMPLEMENTATION FOR SELECT type t, COUNT(DISTINCT id) FROM dfs.`/tmp/donuts.json` WHERE type='donut' GROUP BY type;
+------------+------------+
|   text    |   json    |
+------------+------------+
| DrillScreenRel
  DrillProjectRel(t=[$0], EXPR$1=[$1])
    DrillAggregateRel(group=[{0}], EXPR$1=[COUNT($1)])
    DrillAggregateRel(group=[{0, 1}])
        DrillFilterRel(condition=[=($0, 'donut')])
        DrillScanRel(table=dfs, /tmp/donuts.json, groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`type`, `id`], files=[file:/tmp/donuts.json]]]) | {
  "head" : {
    "version" : 1,
    "generator" : {
    "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
    "info" : ""
    },
    "type" : "APACHE_DRILL_LOGICAL",
    "options" : null,
    "queue" : 0,
    "resultMode" : "LOGICAL"
  }, ...

一方、物理プランは Drill のオプティマイザが様々なルールを元にオペレータや関数の再構成を行った後の最適化されたプランです。物理プランはクエリの実行パフォーマンスに大きく影響を与えるため、意図通りに物理プランが作成されているか、もしくはさらに最適化できる余地がないかどうかを探る際に、とても重要です。さらに、物理プランの一部を変更してクエリを再実行してみる、ということも可能です。

物理プランを EXPLAIN で表示してみましょう。今度は WITHOUT IMPLEMENTATION をつけていません。

0: jdbc:drill:zk=local> EXPLAIN PLAN FOR SELECT type t, COUNT(DISTINCT id) FROM dfs.`/tmp/donuts.json` WHERE type='donut' GROUP BY type;
+------------+------------+
|   text    |   json    |
+------------+------------+
| 00-00 Screen
00-01   Project(t=[$0], EXPR$1=[$1])
00-02       Project(t=[$0], EXPR$1=[$1])
00-03       HashAgg(group=[{0}], EXPR$1=[COUNT($1)])
00-04           HashAgg(group=[{0, 1}])
00-05           SelectionVectorRemover
00-06               Filter(condition=[=($0, 'donut')])
00-07               Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`type`, `id`], files=[file:/tmp/donuts.json]]]) ...

各行の先頭には

<Major Fragment ID>-<Operator ID>

という番号が割り振られています。Major Fragment というのは、クエリ実行のフェーズを表す抽象的な概念です。Major Fragment には1つ以上のオペレーターが含まれますが、Major Fragment 自体がタスクを実行するわけではありません。Drill のクエリプランでは、クラスタノード間でデータの交換が必要になるポイントで Major Fragment が分けられます。オペレータはフィルタリングやハッシュ集約などの処理そのものです。

Drill Query Execution: Major Fragments
https://drill.apache.org/docs/drill-query-execution/#major-fragments

さらに、より詳細な調査が必要な場合には、INCLUDING ALL ATTRIBUTES をつけることでクエリプランの選択の根拠として使われたコスト情報に関する情報が表示されます。予想される処理行数、CPU、メモリ、ディスク I/O、ネットワーク I/O リソースの見積もりを確認することで、そのクエリが効率的に実行されるかどうかの判断の助けになります。

0: jdbc:drill:zk=local> EXPLAIN PLAN INCLUDING ALL ATTRIBUTES FOR SELECT * FROM dfs.`/tmp/donuts.json` WHERE type='donut';
+------------+------------+
|   text    |   json    |
+------------+------------+
| 00-00 Screen: rowcount = 1.0, cumulative cost = {5.1 rows, 21.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 889
00-01   Project(*=[$0]): rowcount = 1.0, cumulative cost = {5.0 rows, 21.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 888
00-02       Project(T1¦¦*=[$0]): rowcount = 1.0, cumulative cost = {4.0 rows, 17.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 887
00-03       SelectionVectorRemover: rowcount = 1.0, cumulative cost = {3.0 rows, 13.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 886
00-04           Filter(condition=[=($1, 'donut')]): rowcount = 1.0, cumulative cost = {2.0 rows, 12.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 885
00-05           Project(T1¦¦*=[$0], type=[$1]): rowcount = 1.0, cumulative cost = {1.0 rows, 8.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 884
00-06               Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/donuts.json, numFiles=1, columns=[`*`], files=[file:/tmp/donuts.json]]]): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 883 ...

例えば、前回の記事ではジョインの最適化について書きましたが、Distributed Join が選択されるのか、Broadcast join が選択されるのか、その選択の基準になったコスト情報はどうか、ということをクエリプランから読み取ることが可能です。

最後に、Drill が生成した物理プランを変更し再実行するハッキングの方法を紹介します。パフォーマンスの向上を模索するときに、一部のオペレータを変えてみたらどうなるのか、を確かめたいときに役に立ちます。

  1. EXPLAIN PLAN FOR をクエリの先頭につけて、物理プランを表示する
  2. 物理プランの JSON 出力をコピーして、直接プランを望むように編集する
  3. Drill の Web UI(http:// <Drillbit が動作するノード>:8047)にアクセスする
  4. メニューバーから Query を選択する
  5. Query Type のラジオボタンで PHYSICAL を選択する
  6. Query 欄に編集した物理プランをペーストして Submit をクリックする

f:id:nagixx:20151220005054p:plain

Drill 内部のジョインストラテジー

この記事は Apache Drill Advent Calendar 2015 の18日目の記事です。

一般的な RDMBS のジョインアルゴリズムには、代表的なものとして ネストループ結合、マージ結合、ハッシュ結合などがあります。それぞれレコードへのアクセス方法や順序などが異なっており、最適なアルゴリズムを使うことでジョイン処理を効率化することができます。このアルゴリズムの選択は、通常はクエリオプティマイザの仕事です。

一方、分散型の SQL 処理エンジンでは、データが物理的に複数ノードに分散されているため、上記のジョインアルゴリズムとは別の軸として、分散環境特有のジョインプランを考える必要があります。

Drill では、「分散ジョイン (Distributed Join)」と「ブロードキャストジョイン(Broadcast Join)」の2つの選択肢があります。実際、商用の分散RDBMS(Nettezza、Vertica、Greenplum、Redshift など)や SQL on Hadoop(Hive、Impala、Presto など)でも同様のジョインの最適化が行われており、すでにおなじみの手法です。

下記は Drill と他の SQL エンジンでの呼び方を並べたものですが、本質的にはそれぞれ同じプランを表しています。

Drill での呼び方他の SQL エンジンでの呼び方
Distributed Join Shuffle Join、Common Join
Broadcast Join Map Join、Fragment Replicate Join

Distributed Join

Distributed Join においては、両方のテーブルの各レコードが、結合キーのハッシュ値に基づいてノード間に再分散されます。同じハッシュ値を持つレコード、つまり同じ結合キーをもつレコードは同じノードに存在するようになるはずです。

この「ハッシュ分散」オペレーションが行われた後、マージ結合アルゴリズムであれば両テーブルの各レコードはソートされ、ジョインが行われます。ハッシュ結合アルゴリズムであればソートは不要です。ただし、ネストループ結合の場合には Distributed Join を行うことはできません。

Broadcast Join

Broadcast Join では、ジョインが行われる前に片方のテーブルのデータ全体が、すべてのノードにブロードキャストされます。具体的には、内部表側のデータがブロードキャストされる一方、外部表側のデータはそのままで移動させません。

Broadcast Join は大きいファクトテーブル(トランザクションテーブル)と小さいディメンジョンテーブル(マスターテーブル)をジョインするような場合に便利です。大きなファクトテーブルのハッシュ分散はネットワークに負荷をかけるため、小さなディメンジョンテーブルのブロードキャストのほうがより効率的です。ただし、すべてのノードに同じデータを送るため、クラスタの構成やデータのサイズによっては最適にならない場合もあります。

Broadcast Join はネストループ結合、マージ結合、ハッシュ結合のいずれのジョインアルゴリズムにも対応します。

ジョインのタイプネストループ結合マージ結合ハッシュ結合
Distributed Join 不可
Broadcast Join

では、クエリオプティマイザはどのように Distributed Join、Broadcast Join を選択するのでしょうか。まず、内部表の行数がプロパティ planner.broadcast_threshold が表す閾値を超えていれば、Broadcast Join の候補となりえます。それに加えて、クラスタ構成やデータサイズを元にコストの計算が行われ、どのプランが効率的かの判断が行われます。

planner.broadcast_threshold を含め、選択に影響を与えるプロパティを下記に示します。

プロパティ説明デフォルト値
planner.broadcast_factor ブロードキャストのコストを計算する際の係数。小さくすると、Broadcast Join がより選択されやすくなる 1
planner.enable_broadcast_join Broadcast Join を有効にする true
planner.broadcast_threshold Broadcast Join の基準となる行数のしきい値。内部表の行数がこの値を上回る(と予想される)場合は Broadcast Join は選択されない 10000000