読者です 読者をやめる 読者になる 読者になる

Apache Drill ではクエリ処理はこんな風に行われる(Hadoop アドベントカレンダー 2013 16日目)

Drill

この記事は Hadoop アドベントカレンダー 2013の16日目の記事です。

Apache Drill 概要

f:id:nagixx:20131216193805p:plain

Apache DrillGoogle Dremel に触発されて開発されたオープンソースプロジェクトで、2012年8月に Apache Incubator として提案されました。大規模データに対し、バッチ処理ではなく、インタラクティブなクエリの実行を可能にするという意味では、ImpalaやStinger、Presto といったプロジェクトと同様の大きな目的を持っていますが、SQL 2003 完全準拠、パーサや処理実行部分を Pluggable にして幅広いクエリ言語やデータソースに対応、JSON/Avro のようなネストデータに対応、スキーマはオプショナルでスキーマレスなデータに対応、といったところなど、より柔軟で拡張性の高いフレームワークを目指しているのが他のプロジェクトとの違いです。開発の中心は MapR Technologies ですが、開発途上で OpenDremel プロジェクトが合流し、業界他社の複数のエンジニアの参画を得ています。

f:id:nagixx:20131216113825p:plain

上記の図が Drill の構成要素ですが、処理対象は HDFS 上のファイルだけではなく、HBase や Cassandra、MongoDB なども念頭に置かれていることがわかります。

説明の前に

本稿では Drill のクエリ処理のフローを説明しようと思いますが、その前に Drill で使われているオープンソースのソフトウェアコンポーネントや Drill 特有のコンポーネントについて、簡単に説明しておきましょう。

Apache Drill では、いろいろなオープンソースのソフトウェアコンポーネントが使われています。主なものを簡単に紹介します。

SQLLine
http://sqlline.sourceforge.net

リレーショナルデータベースの接続とSQLの実行のための、ピュア Java のコンソールユーティリティです。Oracle の sqlplus、MySQLmysql に相当する機能を提供するといえばわかりやすいかと思います。

Optiq
http://www.hydromatic.net/optiq/

JDBC サーバ、SQL パーサ、クエリオプティマイザを含むデータ管理フレームワークです。データソース、処理リソース、クライアントの中間に位置し、拡張可能なオペレータや変換ルールを追加することにより、様々なデータに対して柔軟なクエリ処理を実行することができるようになります。MongoDB や Splunk などをサポートするアダプタが提供されているほか、Cascading の Lingual でも利用されています。

Hazelcast
http://www.hazelcast.com

Java で実装されたインメモリのデータ分散プラットフォームです。Queue、Set、List、Map といったオブジェクトの格納や Lock、Topic といった機能を分散システムとして提供しています。

続いて、Drill 特有のコンポーネントの用語説明をしておきましょう。

分散システムである Drill を構成する各サーバノードは DrillBit と呼ばれます。各 DrillBit は処理のコーディネーション、クエリプランニング、処理の実行などのすべての役割を全ノードが共通で持っています。DrillBit のメンバーシップは ZooKeeper によって管理され、さらにメタデータやデータ配置情報などのリポジトリとして Hazelcast を利用した分散キャッシュを共有しています。 

f:id:nagixx:20131216120915p:plain

DrillBit 間の RPC 通信を担っているのは、DrillBit 内の BitCom です。各 DrillBit はサーバとしてもクライアントしても動作するため、両方の動作に対応します。DrillBit 間では後述する Fragment や、進行状態の通知、結果データなどが Protobuf メッセージとして送受信されます。

さて、それでは Drill のクエリの流れを見てみましょう。

クライアント側の処理

クライアントから WHERE や LIMIT などの条件がついた SELECT 文を投げてみると仮定しましょう。クライアントで受け付けられたクエリは、まず SQLLine によって JDBC 経由で Optiq に渡されます。

Optiq はクエリをパースし、SQL ステートメントを Logical プランに変換します。Optiq では、この SQL から処理オブジェクトへのマッピングルールを置き換えることができる Pluggable な設計になっているため、Drill ではカスタムルールを定義して、特定の SQL オペレータを Drill が理解する「Logical Operator」シンタックスマッピングしています。

この Logical Operator を組み立てて、必要な構成情報をインプットすることで、Drill の Logical プランができあがります。Logical プランはクエリの結果を生成するために Drill がどんな処理を行えばよいか、ということを記述するためのもので、この段階では最適化や効率的な分散については考慮されません。

Logical プランができあがると、クライアントは DrillBit ホストの中からひとつを選び、その DrillBit に Logical プランを送ります。

f:id:nagixx:20131216113857p:plain

Logical プランから Physical プランへ

クラスタ内の DrillBit はいずれもクライアントからのリクエストを受け付けることができます。リクエストを受け取った DrillBit は Drill プロセスとして、結果をクライアントに返す役割を担います。

この DrillBit 内の UserServer が Logical プランを受け取った後、プランは Foreman に渡されます。Foreman の役割は、Logical プランをより具体的な実行プランに落とし込むことです。(ちなみに Foreman には監督、親方という意味があります。)

Foreman の中で行われていることを少し詳しく見てみましょう。まず Logical プランは Drill のオプティマイザにより Physical プランに変換されます。ちなみに現時点での Drill のオプティマイザは非常にシンプルなもので、単に個々の Logical Operator を 1 つ以上の Physical Operator に変換しているだけです。Operator 間の関連を見ることでより最適化できる余地はありますが、これはこれから徐々に整備されていくでしょう。

Physical プランは Physical Operator の非循環有向グラフ(DAG)で、各ノード間の関係がデータフローを表します。このグラフは、MPP システムでよく使われる多段の実行ツリーです。ツリー内の各ノードはそれぞれ個別の DrillBit プロセスを表し、1つの処理結果が別の入力になるという依存関係になっています。

f:id:nagixx:20131216154113j:plain

多段の実行ツリーに分解する際に、まずそれぞれの Physical Operator の実行に関連する情報を集める必要があります。Physical Operator に構成情報を適用することで、ネットワーク、CPU、メモリ、ディスクといった物理リソースのコストと行サイズ、行数の見積もりが行われます。同時に、処理を行うのに適切な DrillBit の評価が行われます。この、適切な DrillBit を選定する際に用いられる指標は Endpoint Affinity(エンドポイントとしての相性の良さ)と呼ばれます。

Endpoint Affinity の例として、Parquet Scan Operator のケースを見てみましょう。このスキャン処理は、処理効率を考えるとなるべく対象の Parquet ファイルが格納されている場所で実行されるべきです。そこで、ファイルのメタ情報にアクセスし、データが格納されている(複数の)HDFS DataNode を特定して、そこで DrillBit が稼働している場合はそれらを Endpoint の候補とすべく Affinity の値を設定します。

Physical プラン、コストとサイズの見積もり、Endpoint Affinity が揃ったところで、次に Foreman の中の Parallelizer が、このプランを複数の Fragment に分解します。これが最終的な実行プランです。分解された Fragment は、各 DrillBit ノードで実行される独立した Physical プランそのものです。どのような Physical プランにも、(クエリを受け取った DrillBit で実行される)一つの Root Fragment と、複数の Leaf Fragment および Intermediate(中間)Fragment が含まれます。

f:id:nagixx:20131216154053j:plain

Fragment の実行

Root Fragment はクエリを最初に受け取った DrillBit の Worker Manager に登録され、Intermediate Fragment は Hazelcast 分散キャッシュに格納されます。一方、Leaf Fragment は BitCom によって割り当てられた DrillBit に、RPC 経由で Protobuf メッセージにより直接送られます。

Worker Manager は Root Fragment を受け取ると、早速プランの実行を開始します。Root Fragment には必ず Screen Operator が含まれていますが、これは実行をブロックし、返すべき結果が揃うまで待つという Operator です。もしプランの実行に複数の DrillBit が関与する場合には、さらに Exchange Operator も含みます。Exchange Operator は他のノードから戻ってくる結果を待つための Receiver をセットするための Operator です。

送信された Leaf Fragment は、ノードに到着すると直ちに実行が開始されます。Leaf Fragment はパースされて Physical Operator の DAG となり、実行フロー・データフローが準備されます。それぞれの Physical Operator は Pull スタイルのメッセージを発します。ツリーの終端から始まって、それぞれのノードは親に処理結果データの Pull を要求し、親はデータとともにデータのステータスを返します。Drillでは動的なスキーマの変更が許されているため、対象のレコードセットに対する新しいスキーマが適用された際にはそのステータスが通知されるので、各 Operator は新しいスキーマについて対応を行わなければなりません。

Drill で扱われるデータレコードは、RecordBatch というある程度の大きさの単位で扱われます。さらに、Drill は ValueVector という独自のインメモリカラムナフォーマットを実装しており、RecordBatch には各カラムに対応した ValueVector が複数並ぶ形で含まれます。ValueVector は同じカラム内の一連の値を表現するバイト列の集合です(Drill のデータフォーマットは、様々なケースの処理性能やフットプリントを考慮して設計されており、もう少し詳しい説明はこちらもご参考に)。Physical Operator 内のメッセージの Pull により RecordBatch が返ってきます。

f:id:nagixx:20131216163530p:plain

先に説明した Parquest ファイルを対象とした Scan Operator の例では、Parquet ストレージエンジンで実際のスキャン処理が行われています。ストレージエンジンはデータソースからデータを読み込み、それを ValueVector に変換し、RecordBatch として子に引き渡す、という処理を担っています。

最終的に、Leaf Fragment はこれら一連の処理を行い、Sender Operator により Intermediate DrillBit に結果が送られます。

Intermediate DrillBit は RecordBatch を受け取ると、RecordBatch にひもづいた Fragment ID をもとに HazelCast から該当の Intermediate Fragment を取り出します。そして、その DrillBit で必要な処理のための Receiver と Physical Opreator の準備をします。

Intermediate Fragment には Filtering Operator が含まれています。この Operator では、RecordBatch を受け取るとスキーマが取り出され、指定されたフィルタ表現と型情報とともに CodeGeneration に渡されます。ここでは、キャストを避け、タイトなループを回し、関数呼び出しを避けて、効率よくプリフェッチが行われるように設計されています。これは、Stinger 向けに実装されている Hive の新しいベクトル化クエリエンジンや、Impala の実装と同様のアプローチです。

Intermediate Fragment は、最終的に一連のデータをクエリを最初に受け付けた DrillBit に流し込みます。そして、それは Screen Operator が受け取り、クライアントに返されます。

クライアントは RecordBatch を受け取ると、カラムナ型の ValueVector を行に変換し、その結果を出力します。

 

以上、現時点での Drill のデータフローの概要を説明しました。まだ Drill は Alpha バージョンですので、この先様々なテストが行われ、アーキテクチャの検証が行われるでしょう。また、Operator やストレージエンジンなどの Pluggable な部分は、対応が進むにつれてできることが増えていくので、こちらも楽しみです。 

さて、明日の Hadoop アドベントカレンダー@uprush さんです。お楽しみに!

 

参考: Apache Drill Technical Overview / Lifetime of a Query in Drill Alpha Release