ネスト構造のJSONデータにApache Drillで直接SQLクエリをかける

この記事は Spark, SQL on Hadoop etc. Advent Calendar 2014 の18日目の記事です。

Drill のここがすごい

先日 Apache Drill も晴れて Apache Software Foundation のトップレベルプロジェクトになりまして、来年初頭のバージョン1.0リリースに向けて機能が追加され品質も徐々に高まってきています。さて、Drillとはどういうものかについては去年のHadoopアドベントカレンダーの記事でも触れました。

が、改めてDrillの特徴と他のSQL-on-Hadoopプロジェクトとどういうところが違うのかについて説明しましょう。

速攻使える (Agility)

リレーショナルデータベースの世界でも、データウェアハウスの世界でも、そして近年のHadoopのような分散処理プラットフォームの世界でも、データ分析を行う上での大きな課題は、いかに迅速に必要なデータを集めて処理に必要な準備を整えるか、ということでした。大抵の場合、データは複数の場所に散在しておりそれぞれのデータのスキーマ管理も別々に行われているため、データを1箇所に移動してスキーマの整合性を取り、フォーマットを揃え・・・といった作業はものすごく骨が折れます。よく分析を専門としている方々が、分析プロセスの8割9割は前処理だ、なんて言っているところの結構な部分を占めているのがこれです。

この課題をいかに解決するか、というのがDrillが生まれた背景の一つです。このためDrillは専用のデータストアやスキーマリポジトリを持たず、ファイルシステム、Hive、HBase、MongoDBといった既存のデータストアに接続し、格納されているデータに対しネイティブフォーマットのまま直接処理を行います。Drillはクエリ処理を行うエンジンのヘッドのみを持ち、既存のスキーマをもつデータストアはそれをそのまま活用、自己記述型のデータやスキーマレスなフラットファイルはクエリ実行時にスキーマを動的に適用するしくみになっています。つまり、分析のための事前準備を極力省くことを可能にし、思い立ったらすぐにデータの探索を始められるようになるのです。

柔軟 (Flexibility)

SQLといえば通常は表形式のデータに対して処理を行うものというイメージがありますが、Drillは複雑な構造のデータにも対応するためJSONに似た構造の内部形式でデータを扱っており、ネスト構造やアレイ構造、さらにデータの構成が動的に変化することも許容しています。そしてそのような複雑な構造のデータをアプリケーションから操作できるように、SQLを拡張して柔軟に処理を行えるインターフェースも追加しています。

他のSQL-on-Hadoopプロジェクトは基本的にそれぞれの独自のデータストアに最適化された形式で事前にロードすることを想定しており、いったん格納してしまうとデータ構造を変更するのが難しい面があります。一方Drillはクエリ実行時に動的に対応するため、遥かに高い柔軟性を備えています。

見たことある感じ (Familiarity)

いくら技術が優れていても、それが使いにくければやがて誰も触らなくなります。DrillではHadoopや分散処理プラットフォームに詳しくないユーザでも使いやすいようにANSI標準SQLに対応し、JDBC/ODBC経由であれば使い慣れた既存のBIツールや可視化ツールを通じても利用可能です。また、DrillはHiveとの親和性も高く、Hiveが対応しているファイルフォーマットはすべてサポートするほか、Hive UDFも再利用可能です。

ネスト構造のJSONデータにSQLクエリをかけてみる

さて、数日前に @ka4geru さんも Advent Calendar 13日目の記事でDrillを使ったJSON形式のデータ処理について書かれておりましたが、本記事ではフラットなJSONデータではなく、もう少し複雑なネスト構造のJSONの扱いについて見てみましょう。Drillの環境にはやはりお手軽な MapR の SandboxVMwareまたはVirtualBox仮想マシンイメージ)を使用します。

Drillは最新の0.6.0.r2が入っています。

[user01@maprdemo ~]$ rpm -q mapr-drill
mapr-drill-0.6.0.28642.r2-1.noarch

JSONデータは、私ががんばって構造化したゆるキャラデータを使用します。ゆるキャラの名前、都道府県、市区町村、投票の獲得票数が入っています。

[user01@maprdemo ~]$ hadoop fs -cat /yuru/grandprix2014.json
{
"title":"ゆるキャラグランプリ2014",
"date":"2014-11-03",
"url":"http://www.yurugp.jp",
"data":{
"chars":[
{"name":"ぐんまちゃん","prefecture":"群馬県","city":"","point":"1002505"},
{"name":"ふっかちゃん","prefecture":"埼玉県","city":"深谷市","point":"835981"},
{"name":"みきゃん","prefecture":"愛媛県","city":"","point":"749911"},
{"name":"しんじょう君","prefecture":"高知県","city":"須崎市","point":"525306"},
  {"name":"チャチャ王国のおうじちゃま","prefecture":"京都府","city":"宇治市","point":"522704"},
{"name":"与一くん","prefecture":"栃木県","city":"大田原市","point":"501122"},
{"name":"しまねっこ","prefecture":"島根県","city":"","point":"469791"},
{"name":"とち介","prefecture":"栃木県","city":"栃木市","point":"469684"},
  {"name":"あゆコロちゃん","prefecture":"神奈川県","city":"厚木市","point":"428061"},
  {"name":"しっぺい","prefecture":"静岡県","city":"磐田市","point":"404123"}
]
}
}

これを使い、まずは単純にこのJSONファイルをデータソースに指定して、SELECT * をしてみましょう。フロントエンドのsqllineを起動して、SELECT文のFROMには分散ファイルシステムを表す「dfs」に続いてJSONファイルのフルパスを指定します。

[user01@maprdemo ~]$ sqlline
sqlline version 1.1.6
0: jdbc:drill:> SELECT * FROM dfs.`/yuru/grandprix2014.json`;
+------------+------------+------------+------------+
| title | date | url | data |
+------------+------------+------------+------------+
| ゆるキャラグランプリ2014 | 2014-11-03 | http://www.yurugp.jp | {"chars":[{"name":"ぐんまちゃん","prefecture":"群馬県","city":"","point":"1002505"},{"name":"ふっかちゃん","prefecture":"埼玉県","city":"深谷市","point":"835981"},{"name":"みきゃん","prefecture":"愛媛県","city":"","point":"749911"},{"name":"しんじょう君","prefecture":"高知県","city":"須崎市","point":"525306"},{"name":"チャチャ王国のおうじちゃま","prefecture":"京都府","city":"宇治市","point":"522704"},{"name":"与一くん","prefecture":"栃木県","city":"大田原市","point":"501122"},{"name":"しまねっこ","prefecture":"島根県","city":"","point":"469791"},{"name":"とち介","prefecture":"栃木県","city":"栃木市","point":"469684"},{"name":"あゆコロちゃん","prefecture":"神奈川県","city":"厚木市","point":"428061"},{"name":"しっぺい","prefecture":"静岡県","city":"磐田市","point":"404123"}]} |
+------------+------------+------------+------------+
1 row selected (2.194 seconds)

そうすると、JSONのトップレベルの項目はカラムとして出てきますが、dataカラムは巨大な1フィールドとしてしか見えていません。ではdataの中身に対してクエリをかけるにはどうすればよいでしょうか。

ここではDrillで導入されたFLATTEN()関数を使います。FLATTEN()は繰り返されているフィールドを個別のレコードに分解し、その他のフィールドのカラムはそれぞれの新しいレコードに値がコピーされます。下の例では、data.chars内の繰り返しがレコードに展開されました。

0: jdbc:drill:> SELECT FLATTEN(t.data.chars) AS char FROM dfs.`/yuru/grandprix2014.json` t;
+------------+
| char |
+------------+
| {"name":"ぐんまちゃん","prefecture":"群馬県","city":"","point":"1002505"} |
| {"name":"ふっかちゃん","prefecture":"埼玉県","city":"深谷市","point":"835981"} |
| {"name":"みきゃん","prefecture":"愛媛県","city":"","point":"749911"} |
| {"name":"しんじょう君","prefecture":"高知県","city":"須崎市","point":"525306"} |
| {"name":"チャチャ王国のおうじちゃま","prefecture":"京都府","city":"宇治市","point":"522704"} |
| {"name":"与一くん","prefecture":"栃木県","city":"大田原市","point":"501122"} |
| {"name":"しまねっこ","prefecture":"島根県","city":"","point":"469791"} |
| {"name":"とち介","prefecture":"栃木県","city":"栃木市","point":"469684"} |
| {"name":"あゆコロちゃん","prefecture":"神奈川県","city":"厚木市","point":"428061"} |
| {"name":"しっぺい","prefecture":"静岡県","city":"磐田市","point":"404123"} |
+------------+
10 rows selected (0.308 seconds)

さらにこれをカラムに分解しましょう。こうすると、普通のテーブル風にアクセスできるようになりますね。ちなみに`char`にバッククォートがついているのは、charは予約語のため、名前と区別するためです。

0: jdbc:drill:> SELECT
. . . . . . . > t2.`char`.name AS ゆるキャラ,
. . . . . . . > t2.`char`.prefecture AS 都道府県,
. . . . . . . > t2.`char`.city AS 市区町村,
. . . . . . . > t2.`char`.point AS ポイント
. . . . . . . > FROM (
. . . . . . . > SELECT FLATTEN(t1.data.chars) AS `char` FROM dfs.`/yuru/grandprix2014.json` t1
. . . . . . . > ) t2;
+------------+------------+------------+------------+
| ゆるキャラ | 都道府県 | 市区町村 | ポイント |
+------------+------------+------------+------------+
| ぐんまちゃん | 群馬県 | | 1002505 |
| ふっかちゃん | 埼玉県 | 深谷市 | 835981 |
| みきゃん | 愛媛県 | | 749911 |
| しんじょう君 | 高知県 | 須崎市 | 525306 |
| チャチャ王国のおうじちゃま | 京都府 | 宇治市 | 522704 |
| 与一くん | 栃木県 | 大田原市 | 501122 |
| しまねっこ | 島根県 | | 469791 |
| とち介 | 栃木県 | 栃木市 | 469684 |
| あゆコロちゃん | 神奈川県 | 厚木市 | 428061 |
| しっぺい | 静岡県 | 磐田市 | 404123 |
+------------+------------+------------+------------+
10 rows selected (0.263 seconds)

では今度は2014年分だけでなく、2013年分、2012年分の複数JSONファイルを用意します。

[user01@maprdemo ~]$ hadoop fs -ls /yuru
Found 3 items
-rw-r--r-- 1 user01 mapr 1079 2014-12-18 08:02 /yuru/grandprix2012.json
-rw-r--r-- 1 user01 mapr 1078 2014-12-18 07:54 /yuru/grandprix2013.json
-rw-r--r-- 1 user01 mapr 1095 2014-12-18 07:56 /yuru/grandprix2014.json

で、FROMのデータソースに指定するパスはファイルではなく、ファイルが格納されているディレクトリです。こうすることにより、ディレクトリ配下のファイル全てを1つのデータソースとして扱うことが可能です。

そして3年分のデータから各ゆるキャラの合計票数を求め、ランキングにしてみます。

0: jdbc:drill:> SELECT
. . . . . . . > t2.`char`.name AS ゆるキャラ,
. . . . . . . > SUM(CAST(t2.`char`.point AS INT)) AS 合計
. . . . . . . > FROM (
. . . . . . . > SELECT FLATTEN(t1.data.chars) AS `char` FROM dfs.`/yuru` t1
. . . . . . . > ) t2
. . . . . . . > GROUP BY t2.`char`.name
. . . . . . . > ORDER BY 合計 DESC;
+------------+------------+
| ゆるキャラ | 合計 |
+------------+------------+
| ぐんまちゃん | 2015806 |
| ふっかちゃん | 1420018 |
| さのまる | 1367513 |
| 出世大名家康くん | 1285684 |
| しまねっこ | 955209 |
| あゆコロちゃん | 954866 |
| 与一くん | 908090 |
| ちょるる | 847997 |
| しっぺい | 762442 |
| みきゃん | 749911 |
| バリィさん | 547284 |
| しんじょう君 | 525306 |
| チャチャ王国のおうじちゃま | 522704 |
| とち介 | 469684 |
| かわりみ千兵衛 | 361104 |
| やなな | 127820 |
| 滝ノ道ゆずる | 114600 |
+------------+------------+
17 rows selected (0.807 seconds)

続いて、今度はオンライン投票で出力された(つもりの)、CSV形式のログファイルを持ってきました。以下のように、タイムスタンプ、ユーザID、ゆるキャラ名がカンマ区切りで格納されています。

$ hadoop fs -cat vote.csv | head -n 5
2014-10-04 15:01:43.231,3498645938221,さのまる
2014-10-04 15:01:57.734,1348062483024,バリィさん
2014-10-04 15:02:33.656,2309123985276,しんじょう君
2014-10-04 15:03:18.674,3428705610523,ぐんまちゃん
2014-10-04 15:03:31.530,2309868237482,あゆコロちゃん

このログと、先ほどの2014年のゆるキャラデータをもとに、「2014年10月4日17時以降に投票された票数」を求めてみましょう。下のSQLではCSVログとJSONデータをゆるキャラ名をキーにしてジョインし、時刻の条件をつけてフィルタをかけた後に並び替えています。

0: jdbc:drill:> SELECT
. . . . . . . > t1.columns[2] AS ゆるキャラ,
. . . . . . . > count(*) AS カウント
. . . . . . . > FROM dfs.`/vote.csv` AS t1
. . . . . . . > JOIN (SELECT FLATTEN(t2.data.chars) AS `char` FROM dfs.`/yuru/grandprix2014.json` t2) t3
. . . . . . . > ON t1.columns[2] = CAST(t3.`char`.name as VARCHAR(20)) AND
. . . . . . . > t1.columns[0] > '2014-10-04 17:00:00.000'
. . . . . . . > GROUP BY t1.columns[2];
. . . . . . . > ORDER BY カウント DESC;
+------------+------------+
| ゆるキャラ | カウント |
+------------+------------+
| ぐんまちゃん | 1002 |
| ふっかちゃん | 781 |
| みきゃん | 691 |
| しんじょう君 | 506 |
| チャチャ王国のおうじちゃま | 505 |
| 与一くん | 478 |
| しまねっこ | 369 |
| とち介 | 354 |
| あゆコロちゃん | 261 |
| しっぺい | 223 |
+------------+------------+
10 rows selected (1.043 seconds)

以上、ネスト構造のJSONデータにいろいろなSQLをかけてみました。このように、Drillを使えば事前にフォーマット変換やデータのロードを行うことなく、素早く柔軟にクエリをかけられることがお分かりいただけたのではと思います。

全国のドリラーの皆様もぜひ、レッツドリる!

Apache Drill ではクエリ処理はこんな風に行われる(Hadoop アドベントカレンダー 2013 16日目)

この記事は Hadoop アドベントカレンダー 2013の16日目の記事です。

Apache Drill 概要

f:id:nagixx:20131216193805p:plain

Apache DrillGoogle Dremel に触発されて開発されたオープンソースプロジェクトで、2012年8月に Apache Incubator として提案されました。大規模データに対し、バッチ処理ではなく、インタラクティブなクエリの実行を可能にするという意味では、ImpalaやStinger、Presto といったプロジェクトと同様の大きな目的を持っていますが、SQL 2003 完全準拠、パーサや処理実行部分を Pluggable にして幅広いクエリ言語やデータソースに対応、JSON/Avro のようなネストデータに対応、スキーマはオプショナルでスキーマレスなデータに対応、といったところなど、より柔軟で拡張性の高いフレームワークを目指しているのが他のプロジェクトとの違いです。開発の中心は MapR Technologies ですが、開発途上で OpenDremel プロジェクトが合流し、業界他社の複数のエンジニアの参画を得ています。

f:id:nagixx:20131216113825p:plain

上記の図が Drill の構成要素ですが、処理対象は HDFS 上のファイルだけではなく、HBase や Cassandra、MongoDB なども念頭に置かれていることがわかります。

説明の前に

本稿では Drill のクエリ処理のフローを説明しようと思いますが、その前に Drill で使われているオープンソースのソフトウェアコンポーネントや Drill 特有のコンポーネントについて、簡単に説明しておきましょう。

Apache Drill では、いろいろなオープンソースのソフトウェアコンポーネントが使われています。主なものを簡単に紹介します。

SQLLine
http://sqlline.sourceforge.net

リレーショナルデータベースの接続とSQLの実行のための、ピュア Java のコンソールユーティリティです。Oracle の sqlplus、MySQLmysql に相当する機能を提供するといえばわかりやすいかと思います。

Optiq
http://www.hydromatic.net/optiq/

JDBC サーバ、SQL パーサ、クエリオプティマイザを含むデータ管理フレームワークです。データソース、処理リソース、クライアントの中間に位置し、拡張可能なオペレータや変換ルールを追加することにより、様々なデータに対して柔軟なクエリ処理を実行することができるようになります。MongoDB や Splunk などをサポートするアダプタが提供されているほか、Cascading の Lingual でも利用されています。

Hazelcast
http://www.hazelcast.com

Java で実装されたインメモリのデータ分散プラットフォームです。Queue、Set、List、Map といったオブジェクトの格納や Lock、Topic といった機能を分散システムとして提供しています。

続いて、Drill 特有のコンポーネントの用語説明をしておきましょう。

分散システムである Drill を構成する各サーバノードは DrillBit と呼ばれます。各 DrillBit は処理のコーディネーション、クエリプランニング、処理の実行などのすべての役割を全ノードが共通で持っています。DrillBit のメンバーシップは ZooKeeper によって管理され、さらにメタデータやデータ配置情報などのリポジトリとして Hazelcast を利用した分散キャッシュを共有しています。 

f:id:nagixx:20131216120915p:plain

DrillBit 間の RPC 通信を担っているのは、DrillBit 内の BitCom です。各 DrillBit はサーバとしてもクライアントしても動作するため、両方の動作に対応します。DrillBit 間では後述する Fragment や、進行状態の通知、結果データなどが Protobuf メッセージとして送受信されます。

さて、それでは Drill のクエリの流れを見てみましょう。

クライアント側の処理

クライアントから WHERE や LIMIT などの条件がついた SELECT 文を投げてみると仮定しましょう。クライアントで受け付けられたクエリは、まず SQLLine によって JDBC 経由で Optiq に渡されます。

Optiq はクエリをパースし、SQL ステートメントを Logical プランに変換します。Optiq では、この SQL から処理オブジェクトへのマッピングルールを置き換えることができる Pluggable な設計になっているため、Drill ではカスタムルールを定義して、特定の SQL オペレータを Drill が理解する「Logical Operator」シンタックスマッピングしています。

この Logical Operator を組み立てて、必要な構成情報をインプットすることで、Drill の Logical プランができあがります。Logical プランはクエリの結果を生成するために Drill がどんな処理を行えばよいか、ということを記述するためのもので、この段階では最適化や効率的な分散については考慮されません。

Logical プランができあがると、クライアントは DrillBit ホストの中からひとつを選び、その DrillBit に Logical プランを送ります。

f:id:nagixx:20131216113857p:plain

Logical プランから Physical プランへ

クラスタ内の DrillBit はいずれもクライアントからのリクエストを受け付けることができます。リクエストを受け取った DrillBit は Drill プロセスとして、結果をクライアントに返す役割を担います。

この DrillBit 内の UserServer が Logical プランを受け取った後、プランは Foreman に渡されます。Foreman の役割は、Logical プランをより具体的な実行プランに落とし込むことです。(ちなみに Foreman には監督、親方という意味があります。)

Foreman の中で行われていることを少し詳しく見てみましょう。まず Logical プランは Drill のオプティマイザにより Physical プランに変換されます。ちなみに現時点での Drill のオプティマイザは非常にシンプルなもので、単に個々の Logical Operator を 1 つ以上の Physical Operator に変換しているだけです。Operator 間の関連を見ることでより最適化できる余地はありますが、これはこれから徐々に整備されていくでしょう。

Physical プランは Physical Operator の非循環有向グラフ(DAG)で、各ノード間の関係がデータフローを表します。このグラフは、MPP システムでよく使われる多段の実行ツリーです。ツリー内の各ノードはそれぞれ個別の DrillBit プロセスを表し、1つの処理結果が別の入力になるという依存関係になっています。

f:id:nagixx:20131216154113j:plain

多段の実行ツリーに分解する際に、まずそれぞれの Physical Operator の実行に関連する情報を集める必要があります。Physical Operator に構成情報を適用することで、ネットワーク、CPU、メモリ、ディスクといった物理リソースのコストと行サイズ、行数の見積もりが行われます。同時に、処理を行うのに適切な DrillBit の評価が行われます。この、適切な DrillBit を選定する際に用いられる指標は Endpoint Affinity(エンドポイントとしての相性の良さ)と呼ばれます。

Endpoint Affinity の例として、Parquet Scan Operator のケースを見てみましょう。このスキャン処理は、処理効率を考えるとなるべく対象の Parquet ファイルが格納されている場所で実行されるべきです。そこで、ファイルのメタ情報にアクセスし、データが格納されている(複数の)HDFS DataNode を特定して、そこで DrillBit が稼働している場合はそれらを Endpoint の候補とすべく Affinity の値を設定します。

Physical プラン、コストとサイズの見積もり、Endpoint Affinity が揃ったところで、次に Foreman の中の Parallelizer が、このプランを複数の Fragment に分解します。これが最終的な実行プランです。分解された Fragment は、各 DrillBit ノードで実行される独立した Physical プランそのものです。どのような Physical プランにも、(クエリを受け取った DrillBit で実行される)一つの Root Fragment と、複数の Leaf Fragment および Intermediate(中間)Fragment が含まれます。

f:id:nagixx:20131216154053j:plain

Fragment の実行

Root Fragment はクエリを最初に受け取った DrillBit の Worker Manager に登録され、Intermediate Fragment は Hazelcast 分散キャッシュに格納されます。一方、Leaf Fragment は BitCom によって割り当てられた DrillBit に、RPC 経由で Protobuf メッセージにより直接送られます。

Worker Manager は Root Fragment を受け取ると、早速プランの実行を開始します。Root Fragment には必ず Screen Operator が含まれていますが、これは実行をブロックし、返すべき結果が揃うまで待つという Operator です。もしプランの実行に複数の DrillBit が関与する場合には、さらに Exchange Operator も含みます。Exchange Operator は他のノードから戻ってくる結果を待つための Receiver をセットするための Operator です。

送信された Leaf Fragment は、ノードに到着すると直ちに実行が開始されます。Leaf Fragment はパースされて Physical Operator の DAG となり、実行フロー・データフローが準備されます。それぞれの Physical Operator は Pull スタイルのメッセージを発します。ツリーの終端から始まって、それぞれのノードは親に処理結果データの Pull を要求し、親はデータとともにデータのステータスを返します。Drillでは動的なスキーマの変更が許されているため、対象のレコードセットに対する新しいスキーマが適用された際にはそのステータスが通知されるので、各 Operator は新しいスキーマについて対応を行わなければなりません。

Drill で扱われるデータレコードは、RecordBatch というある程度の大きさの単位で扱われます。さらに、Drill は ValueVector という独自のインメモリカラムナフォーマットを実装しており、RecordBatch には各カラムに対応した ValueVector が複数並ぶ形で含まれます。ValueVector は同じカラム内の一連の値を表現するバイト列の集合です(Drill のデータフォーマットは、様々なケースの処理性能やフットプリントを考慮して設計されており、もう少し詳しい説明はこちらもご参考に)。Physical Operator 内のメッセージの Pull により RecordBatch が返ってきます。

f:id:nagixx:20131216163530p:plain

先に説明した Parquest ファイルを対象とした Scan Operator の例では、Parquet ストレージエンジンで実際のスキャン処理が行われています。ストレージエンジンはデータソースからデータを読み込み、それを ValueVector に変換し、RecordBatch として子に引き渡す、という処理を担っています。

最終的に、Leaf Fragment はこれら一連の処理を行い、Sender Operator により Intermediate DrillBit に結果が送られます。

Intermediate DrillBit は RecordBatch を受け取ると、RecordBatch にひもづいた Fragment ID をもとに HazelCast から該当の Intermediate Fragment を取り出します。そして、その DrillBit で必要な処理のための Receiver と Physical Opreator の準備をします。

Intermediate Fragment には Filtering Operator が含まれています。この Operator では、RecordBatch を受け取るとスキーマが取り出され、指定されたフィルタ表現と型情報とともに CodeGeneration に渡されます。ここでは、キャストを避け、タイトなループを回し、関数呼び出しを避けて、効率よくプリフェッチが行われるように設計されています。これは、Stinger 向けに実装されている Hive の新しいベクトル化クエリエンジンや、Impala の実装と同様のアプローチです。

Intermediate Fragment は、最終的に一連のデータをクエリを最初に受け付けた DrillBit に流し込みます。そして、それは Screen Operator が受け取り、クライアントに返されます。

クライアントは RecordBatch を受け取ると、カラムナ型の ValueVector を行に変換し、その結果を出力します。

 

以上、現時点での Drill のデータフローの概要を説明しました。まだ Drill は Alpha バージョンですので、この先様々なテストが行われ、アーキテクチャの検証が行われるでしょう。また、Operator やストレージエンジンなどの Pluggable な部分は、対応が進むにつれてできることが増えていくので、こちらも楽しみです。 

さて、明日の Hadoop アドベントカレンダー@uprush さんです。お楽しみに!

 

参考: Apache Drill Technical Overview / Lifetime of a Query in Drill Alpha Release

MapR Technologiesに入社しました

半年ぶりの更新です。今年3月にEMCを辞めてからしばらくウェアラブルテクノロジー系の仕事をしていたのですが、半年を経てまたビッグデータ&Hadoopの世界に戻ってきました。というわけで、本日10/1、MapR Technologiesに入社したことをご報告します。

改めてMapRに関してですが、3大Hadoopディストリビューションベンダーの一角を占める、シリコンバレーを本拠地とする急成長中の企業です。数日前にも「Hadoopベンダーのマップアールが日本進出、リクルートなどが採用」などのニュースが流れましたが、今年に入ってロンドン、ミュンヘン、パリ、ストックホルム、そして東京と相次いで拠点を拡げています。Hadoop通の間では、特にそのユニークな分散ファイルシステムのイノベーションにより、強固で真の「エンタープライズ品質」なHadoopディストリビューションとして知られています(誇張じゃないですよ!)。

私個人としては前々職でMapR製品を扱っていた関係でそこそこ製品の中身は把握しているつもりですが、ここ半年の間にHBaseを強化したM7 Editionのリリースがあり、時期バージョンの開発も進んでいるようで、急いで追いつかなくてはいけません。幸いなことに、本日からは社員としての参加なので、より濃い情報に触れることができそうで楽しみです。

早速ですが、つい先ほどMapR本社のあるサンノゼについたばかりです。3週間ほど、こちらで勉強して日本に戻りたいと思います。

Data Science Summit 2012 レポート

2012年5月23日にラスベガスで開催された Data Science Summit 2012 に参加してきました。Hadoop ソースコードリーディング第10回でも発表させていただきましたが、下のスライドは講演内容をレポートにまとめたものです。

Data Science Summit は EMC Greenplum 部門が主催するイベントで、去年に引き続き今年が2回目の開催です。コンセプトとしては、アカデミア、ソーシャルエンタープライズ、スタートアップ、公共セクタなど各界のリーダーがネタを持ち寄り「Data Drivenな世界」への道筋を示す集い、ということなので、ほとんどベンダー色のない内容になっています。

とはいうものの、統計関連の学会のように研究者たちが出てきて解析手法やモデルの新しい提案についてガチで議論を繰り広げる、というようなものでもなくて、ネットサービスから学術研究までの幅広い先端事例や、そこで生まれてくる社会的な課題、教育のあり方、人々の意識に与える影響など、わりと幅広いテーマを一般の人にも分かりやすい形で示すという感じでした。

計9セッションのすべての講演のビデオは次のサイトでも見られますので、ご興味のある方はぜひ。
http://www.greenplum.com/datasciencesummit/videos

インサイド MapR (1) (Hadoop アドベントカレンダー 2011 16日目)

この記事は Hadoop アドベントカレンダー 2011 の16日目の記事です。

今年の5月にMapR Technologies社から発表された、独自実装のHadoopディストリビューション(以降、単純にMapRと呼びます)ですが、そのユニークな特徴やオープンソース実装のはるか先をいく先進性から、いろいろなところで注目を集めています。ただ、非常に多彩な機能を持ちながら、これどうやって実現しているの、という風に思っている方も多いと思います。私はお仕事柄MapRの実装を若干詳しく知る立場におりますので、MapRの中身を少し掘り下げてみたいと思います。

MapRはHadoopのどこを改良しているのか

MapR TechnologiesはもともとGoogleでGFS、BigTableMapReduceなどの検索基盤技術を担当していたエンジニア M.C. Srivas 氏などが中心となって3年ほど前に創業された会社ですが、今年製品を発表する前まではひっそりと、いわゆる「ステルスモード」で最適化Hadoop実装の研究開発を進めていました。3年前はすでにHadoopの可能性について世間の注目は集まりはじめていましたが、当時から実装の非効率性や単一障害点の存在などの課題は指摘されていました。ただ、オープンソースコミュニティのプロジェクトなので大きなアーキテクチャ上の変更を伴う提案は合意を得るのはなかなか難しく、開発の速度も遅いため、それであれば企業が本当に必要とする高速・高信頼の実装を自分たちで速攻で作って世に出してしまえばよいではないか、ということで投資家から資金を得て開発を進めたというのが経緯です。

ではMapRが改良を加えた部分は何か、というところですが、主にファイルシステムHDFS)の部分になります。HDFSAPIは100%Apache互換を保ちながら、内部実装をMapR-FSという完全に再設計した実装で置き換えています。これにより、ファイルシステムに対するランダムRead/Write、NFSアクセス、ボリューム管理、スナップショット、ミラーリング、ビルトイン圧縮といった多彩な機能を実現しています。

本エントリでは、特にボリューム管理と、ファイルシステムの基本となるコンテナの実装について書いてみたいと思います。

ボリュームという概念

MapRにはオープンソース実装にはない「ボリューム」という概念があります。オープンソース実装では、HDFSファイルシステムはディレクトリ構造がひとつあるだけで、その中を分けて管理するという発想はありませんが、MapRではボリューム単位で様々な管理ポリシーを設定して細かい制御を行うとともに、ボリュームをベースにしたスナップショットやミラーリングなどのデータ管理機能が用意されています。

ボリュームごとに設定できる項目には次のようなものがあります。

マウントポイント
ファイルシステムのディレクトリ構造の中で、ボリュームをどこにマウントするかを指定します。デフォルトで存在するルートボリュームのマウントポイントは「/」ですが、作成したボリュームを任意のディレクトリ(例えば/vol/vol1)にマウントできます。
容量上限
ボリュームの容量上限を設定します。上限を超えてファイルを作成しようとするとエラーになります。実際の上限値であるハードリミットと、それより小さい値でアラームを発生させるアドバイザリリミットを設定できます。
ボリュームの所有ユーザー
所有権をもつユーザーを指定します。
各ユーザーの管理権限
各ユーザーの管理権限(削除できるか、ダンプできるか、スナップショットを作成できるか等)を設定します。ボリューム内のファイルに対するアクセス権限とは違います。アクセス権限はファイルパーミッションにより設定します。
レプリケーション
MapRではレプリケーション数はボリューム単位で指定します。実際のレプリケーション数と、下回るとアラームを発生させる最低レプリケーション数を設定できます。
スナップショットのスケジュール
あらかじめスケジュールポリシーを設定しておくことにより、例えば毎日深夜0時に有効期間7日間のスナップショット(ボリュームの静止イメージ)を作る、といったことができます。スナップショットのイメージはファイルシステム内に作られ、その後の更新分はCopy-on-Write方式で差分のみ保管されるため、スナップショットの作成は一瞬で完了し、効率的なストレージ利用になります。
ミラーリング
あるボリュームに対し、ミラー先ボリュームを設定することで、ボリューム間のデータ同期を図ることができます。ミラー先ボリュームは読み込み専用ではありますが、クラスタ内で複数のミラーボリュームを作成することでアクセスの負荷分散をしたり、アプリケーション間での効率的なデータ移行を行うことができます。また、遠隔地のクラスタにあるボリュームとミラー構成を組めば、災害対策(DR)としても活用可能です。
トポロジーに対するマッピング
トポロジーとはクラスタの物理的なノード配置をツリー構造で記述しておく設定です。一般的にはひとつのラックに格納されたサーバを一つのトポロジーグループに配置させるといったような構成をとりますが、ボリュームをトポロジにマッピングすることにより、例えば特定ボリュームを特定のラック内のサーバに張り付け、その中だけでデータを分散配置させる、という設定ができます。

コンテナ管理によるスケーラビリティの向上と分散NameNode

オープンソース実装では各ファイルのメタ情報は独立したサーバで稼働するNameNodeで管理されます。この方法には2つの大きな課題があります。1つ目はファイル数が増えるに従って扱うメタ情報も大きくなり、性能が劣化するという点です。各ファイルは一定のブロックサイズで分割され、NameNodeはすべてのデータブロックの配置を管理しているために、アクセスが1点に集中するためです。2つ目はメタ情報の管理が1カ所で行われるため、NameNodeのサーバに障害が発生すると、クラスタのデータアクセスがすべて失われる点です。オープンソース実装でもメタ情報のレプリケーションは行いますし、HAミドルウェアを使って自動フェールオーバーを行うように構成することも可能ですが、追加の構築やテストの工数が増えて大変です。

MapRではこの2つの課題に対応するために、データブロックの管理にコンテナという概念を追加し、非常に巧妙なかたちでメタ情報管理の分散と耐障害性の向上を図っています。

まずコンテナですが、コンテナにはデータコンテナ、ネームコンテナの2種類があります。データコンテナはデータブロックを複数まとめて格納する管理単位です。デフォルトでは16GBの大きさで、この中に入る分だけのデータブロックを格納します。上で説明したボリュームは複数のデータコンテナから構成されます。一方、ネームコンテナはそれぞれのボリュームに1つだけ存在し、ボリューム内のファイルのメタ情報を格納します。オープンソース実装のNameNodeの機能は、このネームコンテナが代替します。ネームコンテナはデータコンテナ同様クラスタ内のノードに分散され、このためNameNodeを独立して稼働させる必要がなく、これが「分散NameNode」といわれるゆえんです。

ところで、各コンテナの配置は誰が管理しているの?という疑問をお持ちの方がいるかもしれません。これに関してはCLDB (Container Location Database) というサービスが管理をしています。なんだ、結局そこが単一障害点になるんじゃないの、という声が聞こえてきそうですが、CLDBは最大3ノードまでのHA構成が可能で、しかもNameNodeとは違ってCLDBで管理する情報はコンテナの位置情報とバージョン情報だけですので、相当大規模な構成でも余裕でメモリに乗り切るサイズな上に、フェールオーバーも高速です。

下にコンテナによるメタ情報管理の概念図を示します。メタ情報管理が2段構成になるようなイメージですね。

次にレプリケーションについて説明します。データのレプリケーションの管理はコンテナ単位で行われます。データコンテナだけでなく、メタ情報を格納するネームコンテナもデータと同じレプリケーションのしくみを使い、耐障害性を確保しています。デフォルトでは各コンテナは3つのレプリカをノードをまたがった形で作りますが、そのうち1つがマスターコンテナになり、残りのレプリカコンテナがデータ更新のための「レプリケーションチェーン」によってマスターとつながれる形になります。

レプリケーションチェーンの構造はネームコンテナとデータコンテナでは異なり、ネームコンテナに関してはマスターを中心としたスター型、データコンテナに関してはマスターを起点とするデイジーチェーン型になっています。これはメタ情報の更新単位は小さいので同時に更新を行っても性能への影響は小さく、一方ファイルデータブロックの更新は単位が大きいので負荷の分散を図って性能への影響を抑えるためです。データの更新はまずマスターに対して行われ、さらにレプリケーションチェーンを辿って各レプリカで更新が行われた後に、今度はチェーンを逆に辿って更新の完了がマスターに通知されます。この一連の更新フローはトランザクションとして扱われ、万が一更新中にどこかのノードで障害が起きた場合にはすべての更新は取り消されるため、データの一貫性が保たれます。

ネームコンテナはボリュームごとに異なるサーバノードに作成されるため、メタ情報に対するアクセスは(少なくともノード数以上のボリュームがあれば)全ノードに対して分散されます。また、各コンテナのレプリケーションチェーンにおけるマスターコンテナも全ノードに分散されるため、データの更新についてもどこかのノードに集中しない形になります。これをまとめると下のようなイメージになります。


以上、今回書ききれなかった部分もたくさんありますので、また次回も続けて行きたいと思います。