HBase データソースに対する Pushdown (2)

この記事は Apache Drill Advent Calendar 2015 の16日目の記事です。

前回の記事からの続きです。

前回は HBase テーブルを対象としたクエリに WHERE 句で条件を加えることで、HBase 側で Pushdown を行う実行プランが作成されている様子を確認しました。では、もう少し複雑な条件ではどうでしょうか。2つの行キーの比較を OR で結んだ条件にしてみます。下にクエリとその実行プランの HBaseScanSpec のパラメータを示します。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key = 'xxx' OR row_key = 'yyy';
startRow=xxx, stopRow=yyy\x00,
filter=FilterList OR (2/2): [
  RowFilter (EQUAL, xxx),
  RowFilter (EQUAL, yyy)
]

スキャンの開始行キーは「xxx」、終了行キーは「yyy\x00」(終了行キー自体はスキャン範囲には含まれないので、末尾に \x00 をつけて yyy も範囲に含めるようにしている)となり、範囲スキャンを行うようにした上で、RowFilter で条件の合う行キーのチェックをしています。複数の RowFilter は FilterList により OR で結合する形になっています。それなりに効率的ですね。

次に、LIKE 演算子で条件を指定してみましょう。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key LIKE 'xxx%';
startRow=xxx, stopRow=xxy,
filter=RowFilter (EQUAL, ^\x5CQxxx\x5CE.*$)

行キーに対する条件が前方一致であるため、範囲スキャンが有効になっていることがわかります。これに加え、LIKE に対応する正規表現が RowFilter のパターン文字列に指定されています。正規表現のマッチングですので、HBase API では RegexStringComparator が使われているはずです。

では、次のような LIKE の後方一致ではどうでしょうか。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE row_key LIKE '%xxx';
startRow=, stopRow=,
filter=RowFilter (EQUAL, ^.*\x5CQxxx\x5CE$)

後方一致では行キーの範囲を絞り込めず、フルスキャンになることがわかります。

これまでの例では、暗黙的に行キーやカラムの値が UTF-8 文字列であることを想定してクエリを記述していましたが、HBase に格納した値が数値型やタイムスタンプ型だった場合は、クエリの書き方に少し注意が必要です。もし行キーの値がビッグエンディアンで格納されていれば、数値や日付の大きさ順でソートされることになるため、範囲フィルタの Pushdown を有効に利用できる可能性があります。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE CONVERT_FROM(BYTE_SUBSTR(row_key, 1, 8), 'UINT8_BE')
BETWEEN CAST(100 AS BIGINT) AND CAST(200 AS BIGINT);
startRow=\x00\x00\x00\x00\x00\x00\x00d,
stopRow=\x00\x00\x00\x00\x00\x00\x00\xC9,
filter=null

上の例では WHERE 句の中で、まず row_key に対し BYTE_SUBSTR 関数で格納されている数値のバイト長(ここでは8バイト)分を先頭から取り出しています。さらにそれを CONVERT_FROM 関数で符号なしのビッグエンディアン(ここでは UNIT8_BE)のエンコーディングで数値型に変換しています。そして同じバイト長の数値型(ここでは BIGINT 型)にキャストした数値と比較します。これらの条件をすべて満たすことにより、範囲スキャンの Pushdown が有効になります。

最後に、Drill は [HBASE-8201] OrderedBytes: an ordered encoding strategy で HBase に追加された、OrderedBytes と呼ばれるエンコーディング手法に対応しています。このエンコーディングでは、データがバイト列として格納されるときにそのデータ型のソート順を保持することができるため、Pushdown に活用することが可能です。

SELECT row_key, cf.c1
FROM hbase.`table1`
WHERE CONVERT_FROM(row_key, 'INT_OB')
BETWEEN CAST(-32 AS INT) AND CAST(59 AS INT);
startRow=+\x7F\xFF\xFF\xE0, stopRow=+\x80\x00\x00;\x00,
filter=FilterList AND (2/2): [
  RowFilter (GREATER_OR_EQUAL, +\x7F\xFF\xFF\xE0),
  RowFilter (LESS_OR_EQUAL, +\x80\x00\x00;)
]

HBase にデータが OrderedBytes で格納されている場合、上のように数値を「INT_OB」エンコーディングで変換して比較することで、範囲スキャンが有効になっていることがわかります。

以上、様々な Pushdown による最適化を紹介しましたが、将来的にはこれ以上のテクニックが駆使されてさらに最適化が進むことが予想されます。例えば、HBase の行キーが複合キー(Intelligent Key)である場合の扱いにはまだ改良の余地があります。今後の性能向上を期待しましょう。

HBase データソースに対する Pushdown (1)

この記事は Apache Drill Advent Calendar 2015 の15日目の記事です。

HBase は Hadoop 上で動作する、「ワイドカラム型」NoSQL データベースです。RDBMS 風のテーブル構造を持ちますが、固定のスキーマを持つわけではないのでデータ構造の変更には柔軟である一方、HBase 自体にはシンプルなアクセス API があるのみで SQL クエリ機能はありません。Drill は HBase をデータストアとして使用することができるため、HBase に格納されたデータを使い慣れた SQL で柔軟に取り扱えるようになるのがメリットです。

ところが、Drill のようにデータストアと実行エンジンが分離しているアーキテクチャで、最適化をまったく考えずに実装してしまうと性能が全然出ません。最も単純な実装として、HBase でテーブルをフルスキャンして Drill 側にデータを渡し、Drill が SQL クエリの実行に必要なフィルタリングや集計を行う、という役割の分離が考えられますが、HBase のフルスキャンかなり遅いです。まあ、これは HBase に限らず Drill が使用するどのようなデータストア(MongoDB や RDBMS とか)にも当てはまります。

そこで、このような状況では一般的に Pushdown という最適化が行われます。HBase の例で言うと、まず HBase では「テーブルの各行にはユニークな行キーが存在しており、ソートされインデックスがついた状態で格納されている」という前提があるため、クエリの述語(WHERE 句の条件)によってあらかじめ行キーのスキャン範囲を絞り込めれば、I/O アクセスが大幅に減り性能向上につながります。他にも、カラムの値の比較が必要な場合に、HBase 内部の実装で比較を行ってから結果を Drill 側に渡してやったほうがより効率的で高速です。このように、より低いレイヤーの実装に処理を移譲する最適化が Pushdown です。

最近 BigQueryで150万円溶かした人の顔というエントリで、「WHERE句には何を書いてもテーブルをフルスキャンしてしまう」というのが話題になっておりましたが、BigQuery に述語の Pushdown の実装がなされていればこんなことにはならなかったはずです。Drill は BigQuery のベースになっている Dremel のオープンソース実装なので、Drill にできて BigQuery にできないことはないと思うのですが、謎ですね。

さて、実際にどのようなクエリがどんな風に Pushdown されるのかを見てみましょう。EXPLAIN PLAN FOR を SELECT の前につけることでクエリの実行プランが表示されるので、違いを確認することができます。

まず HBase テーブル全体を SELECT するクエリ。

$ apache-drill-1.3.0/bin/drill-embedded
0: jdbc:drill:zk=local> EXPLAIN PLAN FOR
. . . . . . . . . . . > SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`;
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=null, stopRow=null, filter=null], columns=[`row_key`, `cf`.`c1`]]])
...

そして WHERE 句で行キーに条件をつけた場合のクエリ。

0: jdbc:drill:zk=local> EXPLAIN PLAN FOR
. . . . . . . . . . . > SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`
. . . . . . . . . . . > WHERE row_key = 'xxx';
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=xxx, stopRow=xxx\x00, filter=null], columns=[`row_key`, `cf`.`c1`]]])
...

実行プランの Scan オペレーターのパラメーターの中に HBaseScanSpec という指定があり、さらに startRow, stopRow という指定もついていることがわかります。そして、WHERE 句をつけると startRow, stopRow に値が入ることも確認できます。

実際には、HBase の API で Scan オブジェクトが作られて、スキャンの開始行キー startRow、終了行キー stopRow がパラメータとして渡され、HBase の範囲スキャン操作が行われます。WHERE 句に行キーの条件をつけることで、大幅に処理が効率化されることがわかります。

別の例も見てみましょう。

0: jdbc:drill:zk=local> SELECT row_key, cf.c1
. . . . . . . . . . . > FROM hbase.`table1`
. . . . . . . . . . . > WHERE cf.c1 = 'xxx';
+------+------+
| text | json |
+------+------+
| 00-00    Screen
00-01      Project(row_key=[$0], EXPR$1=[$1])
00-02        Project(row_key=[$0], ITEM=[ITEM($1, 'c1')])
00-03          Scan(groupscan=[HBaseGroupScan [HBaseScanSpec=HBaseScanSpec [tableName=table1, startRow=null, stopRow=null, filter=SingleColumnValueFilter (cf, c1, EQUAL, xxx)], columns=[`row_key`, `cf`.`c1`]]])
...

今度は行キーの条件はつけていないため範囲スキャンは行われませんが、カラムの値の比較条件がついていることで、filter というパラメータに SingleColumnValueFilter の指定が出てきます。これは HBase のスキャン時に、そのまま SingleColumnValueFilter のパラメータとして HBase 内部のフィルタリング処理に使われます。

以上のように、Pushdown によるクエリ実行計画の最適化の様子がおわかりいただけたかと思いますが、クエリの書き方によっては Pushdown が期待通りに働かないケースもあります。そのあたりの注意点は次の記事に続きます。

テキストファイルとNULLの扱い

この記事は Apache Drill Advent Calendar 2015 の14日目の記事です。

CSV 形式などのテキストファイルでどのように NULL を表現するかは、CSV を出力する RDBMS やアプリケーション毎に異なっているので結構悩みのタネですね。

  • Oracle(SPOOL を使用): 引用符のない空文字
  • SQL Server: 引用符のない空文字
  • DB2: 引用符のない空文字
  • MySQL: \N
  • PostgreSQL: \N(テキスト)、引用符のない空文字(CSV

デフォルトでは上記のような感じだと思いますが、これってオプションで形式を変えることもできますし、この他にも色々なアプリケーションがあるので、その都度表現形式を確認して処理をする必要があります。

さて、Drill では CSV 形式などのテキストファイルをデータソースとする場合は、すべて可変長文字列型として扱われます。なので、次のようなファイルがあったとしたら、

$ cat test.csv
"Japan","14","0"
"United States","23","3"
"China",\N,"5"
"France","16",

Drill で SELECT した結果は次のようになります。\N も引用符のない空文字も、NULL ではなく空文字列として扱われているのがわかります。

$ apache-drill-1.3.0/bin/drill-embedded
0: jdbc:drill:zk=local> SELECT columns[0], columns[1], columns[2] FROM dfs.`/tmp/test.csv`;
+----------------+---------+---------+
|     EXPR$0     | EXPR$1  | EXPR$2  |
+----------------+---------+---------+
| Japan          | 14      | 0       |
| United States  | 23      | 3       |
| China          | \N      | 5       |
| France         | 16      |         |
+----------------+---------+---------+
4 rows selected (0.093 seconds)

では、例えば引用符のない空文字を NULL として扱いたい場合はどうすればよいでしょうか。

方法1: CASE を使う

CASE を使用して、空文字であれば NULL に置き換えます。この方法は NULL を表現するのに他の形式が使われたときにも有効です。あとは、カラム毎に扱いを変えたいときにも柔軟に対応できますね。

0: jdbc:drill:zk=local> SELECT
. . . . . . . . . . . >   columns[0],
. . . . . . . . . . . >   CASE WHEN columns[2] = '' THEN NULL ELSE columns[1] END,
. . . . . . . . . . . > FROM dfs.`/tmp/test.csv`;
+----------------+---------+
|     EXPR$0     | EXPR$1  |
+----------------+---------+
| Japan          | 0       |
| United States  | 3       |
| China          | 5       |
| France         | null    |
+----------------+---------+
4 rows selected (0.116 seconds)

方法2: drill.exec.functions.cast_empty_string_to_null プロパティを使う

drill.exec.functions.cast_empty_string_to_null プロパティを true に設定すると、空文字列が「キャストをする時に」NULL として扱われるようになります。キャストをしない場合には、そのまま空文字列として扱われます。

0: jdbc:drill:zk=local> ALTER SYSTEM SET `drill.exec.functions.cast_empty_string_to_null` = true;
+-------+----------------------------------------------------------+
|  ok   |                         summary                          |
+-------+----------------------------------------------------------+
| true  | drill.exec.functions.cast_empty_string_to_null updated.  |
+-------+----------------------------------------------------------+
1 row selected (0.08 seconds)
0: jdbc:drill:zk=local> SELECT columns[0], CAST(columns[2] AS INT) FROM dfs.`/tmp/test.csv`;
+----------------+---------+
|     EXPR$0     | EXPR$1  |
+----------------+---------+
| Japan          | 0       |
| United States  | 3       |
| China          | 5       |
| France         | null    |
+----------------+---------+
4 rows selected (0.139 seconds)

Drillbit が使用するメモリサイズ

この記事は Apache Drill Advent Calendar 2015 の12日目の記事です。

Drill クラスタを構築する場合、各ノードで Drillbit という Java プロセスを立ち上げます。Drillbit は、ノードに常駐するデーモンプロセスとしてクラスタ全体で協調して動作することで、クエリを効率的に並列処理します(ちなみに Embedded モードでも、Drillbit が1つだけローカルで起動します)。

Drillbit は Java プロセスなので当然ながら Java ヒープを確保して使用しますが、Java ヒープとは別にダイレクトメモリを確保して Drillbit が直接その内部を管理しています。クエリの処理対象のデータはダイレクトメモリ上に展開して保持され、処理はできる限りオンメモリで行うことでパフォーマンスの最大化が図られます。

MapReduce などとは異なり、クエリ処理の中間でメモリ上のデータをディスクにすべて書き出すということはありませんが、もしダイレクトメモリ上にすべてのデータが載り切らない場合には、一部のデータをディスク上に一時的に退避させることがあり、その際にはディスクアクセスが発生します。このため、ダイレクトメモリのサイズはクエリの実行パフォーマンスを大きく左右する要素です。

Drillbit が使用するメモリ設定は、Drill をインストールしたディレクトリの conf/drill-env.sh に書かれています。

$ cat apache-drill-1.3.0/conf/drill-env.sh
...
DRILL_MAX_DIRECT_MEMORY="8G"
DRILL_MAX_HEAP="4G"
...

DRILL_MAX_HEAP は Java の起動オプション -Xmx に渡される Java ヒープの最大サイズで、デフォルトで 4GB に設定されています。処理対象のデータをヒープに置くことはないため、ほとんどのケースでは変更する必要はないでしょう。

一方、DRILL_MAX_DIRECT_MEMORY はダイレクトメモリの最大値です(デフォルト 8GB)。ノード上の利用可能なメモリをできるだけ多く割り当てることで、パフォーマンスを最大化することができます。

ただし、クラスタノード上で他のプロセスが動作している場合には、設定値に注意してください。例えば Hadoop クラスタでは、HDFS や HBase などのデータストアや、YARN フレームワークを動作させているのが一般的です。Drill 1.3 の時点では、まだ Drill は YARN に対応していないため、Drill のメモリの割り当てと YARN のメモリ割り当ては、合計使用量が物理メモリ容量を超えないように別々に固定で設定しておく必要があります。例えば、64GB メモリのマシンがあった場合、YARN で 32GB、HDFS で 16GB を確保した場合は、Drill で利用可能なメモリを 12GB 程度に抑えておく必要があります。

クエリ毎のリソース制御について

この記事は Apache Drill Advent Calendar 2015 の11日目の記事です。

Drill クラスタを構築して、ある程度規模の大きい SQL クエリ基盤を運用する場合、普通は複数のユーザーや複数のアプリケーションで Drill クラスタを共有する使い方をすると思います。その時に課題となるのは、個々のクエリが使うリソースをどのように制御できるかというところです。本記事では、Drill が備えるリソース制御のオプションについて見てみましょう。

まず同時に実行可能なクエリの上限について。デフォルトでは同時に実行可能なクエリの数は無制限ですが、むやみに同時実行数を増やしていくとリソースの競合が激しくなり、全体的なパフォーマンスが低下します。これを防ぐため、Drill ではクエリの「キュー」を用意し、設定した上限までのクエリをキューから取り出して同時実行するしくみがあります。

Enabling Query Queuing
https://drill.apache.org/docs/enabling-query-queuing/

exec.queue.enable プロパティを true にすると、キューが有効になります。Drill では、「large」キューと「small」キューの2つのキューが用意され、それぞれ個別に同時実行数を設定することが可能です。投入したクエリがどちらのキューに入るかは、推定される処理件数のしきい値 exec.queue.threshold プロパティによって決まります。

プロパティ説明デフォルト値
exec.queue.enable キューを有効にするかどうか false
exec.queue.large large キューに入ったクエリの同時実行数 10
exec.queue.small small キューに入ったクエリの同時実行数 100
exec.queue.threshold クエリの推定される処理件数がこの値より大きければ large キュー、小さければ small キューに投入される 30000000
exec.queue.timeout_millis クエリがキューに入ってからこの時間が経過すると失敗する(ミリ秒) 300000

また、個々のクエリ実行の並列度についても調整可能なオプションがいくつかあります。

Configuring Resources for a Shared Drillbit: Configuring Parallelization
https://drill.apache.org/docs/configuring-resources-for-a-shared-drillbit/#configuring-parallelization

プロパティ planner.width.max_per_node はクエリあたりの1ノード(クラスタを構成するマシン1台)で処理できる並列度の最大値です。例えばフィルタリングの処理は、マルチコアマシンでは複数スレッドを使って並列に実行することで効率的に処理を行えます。この設定値はクエリあたりの上限ですが、これとは別に、すべてのクエリの合計の1ノードあたりの並列度は「ノード上のアクティブな Drillbit の数 * ノードのコア数 (Hyper-threadを考慮) * 0.7」を超えることはありません。

プロパティ planner.width.max_per_query はクエリあたりの処理の並列度の最大値です。これはクラスタ全体での並列度になります。実際には、(planner.width.max_per_node * ノード数) と planner.width.max_per_query のどちらか小さい値が、クエリあたりの最大並列度になります。

プロパティ説明デフォルト値
planner.width.max_per_node クエリあたりの1ノードで処理できる並列度の最大値 3
planner.width.max_per_query クエリあたりの処理の並列度の最大値 1000

さらに、planner.memory.max_query_memory_per_node プロパティでノード単位で各クエリが利用できるメモリの最大値を指定することが可能です。

Configuring Drill Memory
https://drill.apache.org/docs/configuring-drill-memory/

Drill はクエリプランの実行に必要なメモリが足りないことが推定される場合、少ないメモリでも実行可能なプランに作り直す場合があります。また、ウィンドウ関数のような外部ソートのための大量のメモリを必要とするクエリでエラーが起きるような場合、この設定値を大きくすることでエラーを回避することができる可能性があります。

プロパティ説明デフォルト値
planner.memory.max_query_memory_per_node ノード単位で各クエリが利用できるメモリの最大値(Byte) 2147483648

上記のプロパティはシステム全体でもセッション単位でも指定できます。例えばシステム全体で指定する場合は

0: jdbc:drill:zk=local> ALTER SYSTEM SET `exec.queue.enable` = true;

セッション単位で指定する場合は

0: jdbc:drill:zk=local> ALTER SESSION SET `exec.queue.enable` = true;

を実行します。

CSV ファイルを Parquet ファイルに変換してクエリを高速化

この記事は Apache Drill Advent Calendar 2015 の8日目の記事です。 

Apache Drill では Apache Parquet という大規模データの分析に適したデータフォーマットを利用することができます。Row-oriented フォーマットにカテゴリ分けされる CSV、TSV といったテキストファイルや伝統的なリレーショナルデータベースのテーブルでは、データは行方向に沿って格納されますが、Column-oriented フォーマットにカテゴリ分けされる Parquet、ORC といった形式のデータは列方向に沿って格納されます。

f:id:nagixx:20151209000303p:plain

データ分析用途には Column-oriented フォーマットが向いているとよく言われますが、データ分析では特定の列の値を集計したり、特定の列の条件を元にフィルタリングやジョインが行われることが多いため、列方向にデータが連続して格納されていると必要なデータのみを効率的に読み込むことができる、ということがその理由です。また、列方向には同じ種類のデータが並んでいるため、圧縮アルゴリズムも効きやすくなります。

必要なスキャン範囲を絞ることを目的として列方向にデータを分割して格納することを垂直パーティショニングと言いますが、Parquet ではさらに内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っています。これらの組み合わせにより、クエリの高速化を実現しています。

f:id:nagixx:20151207133252p:plain

さて、Drill で Parquet を扱うための方法ですが、ここからダウンロードした次のような CSV ファイルがあるとします。

$ cat /tmp/MonthlyPassengerData_200507_to_201506.csv
Activity Period,Operating Airline,Operating Airline IATA Code,Published Airline,Published Airline IATA Code,GEO Summary,GEO Region,Activity Type Code,Price Category Code,Terminal,Boarding Area,Passenger Count
200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Deplaned,Low Fare,Terminal 1,B,27271
200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Enplaned,Low Fare,Terminal 1,B,29131
200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Thru / Transit,Low Fare,Terminal 1,B,5415
200507,Air Canada ,AC,Air Canada ,AC,International,Canada,Deplaned,Other,Terminal 1,B,35156

まずヘッダ行の空白をアンダースコアで置換し(Drill は空白を含むカラム名を扱えない)、tr で余分な改行コード(CR)を取り除いた後で、ヘッダ行をカラム名として利用するために、拡張子を csvh に変更します。

$ sed -e '1s/ /_/g' /tmp/MonthlyPassengerData_200507_to_201506.csv | tr -d '\r' > /tmp/MonthlyPassengerData_200507_to_201506.csvh

そして、これを Parquet ファイルに変換するには、store.format プロパティに parquet を指定した状態で  CREATE TABLE AS SELECT(通称 CTAS)を使用します。

$ apache-drill-1.3.0/bin/drill-embedded
0: jdbc:drill:zk=local> ALTER SESSION SET `store.format` = 'parquet';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+
1 row selected (0.196 seconds)
0: jdbc:drill:zk=local> CREATE TABLE dfs.tmp.`/airport_data/` AS
SELECT * FROM dfs.`/tmp/MonthlyPassengerData_200507_to_201506.csvh`
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 13901                      |
+-----------+----------------------------+
1 row selected (1.111 seconds)

これだけ。簡単ですね。これで、/tmp/airport_data/ ディレクトリの下に Parquet フォーマットのファイルが保存されます。クエリを実行するには、データソースとしてこのディレクトリを指定します。

0: jdbc:drill:zk=local> SELECT Activity_Period, Operating_Airline, Passenger_Count
. . . . . . . . . . . > FROM dfs.tmp.`/airport_data/`
. . . . . . . . . . . > WHERE CAST(Passenger_Count AS INT) < 5;
+------------------+-----------------------------------+------------------+
| Activity_Period  |         Operating_Airline         | Passenger_Count  |
+------------------+-----------------------------------+------------------+
| 200610           | United Airlines - Pre 07/01/2013  | 2                |
| 200611           | Ameriflight                       | 1                |
| 200611           | Ameriflight                       | 1                |
...

あと、上記の CTAS クエリによる変換だと、元の CSV ファイルから読み込む時にすべてのカラムが VARCHAR 型として扱われてしまうので、後で直接集計などをしたい場合には、次のように Parquet に変換するタイミングでカラムごとにデータ型を指定しておきましょう。

0: jdbc:drill:zk=local> CREATE TABLE dfs.tmp.`/airport_data/` AS SELECT
. . . . . . . . . . . >   CAST(SUBSTR(Activity_Period, 1, 4) AS INT) AS `Year`,
. . . . . . . . . . . >   CAST(SUBSTR(Activity_Period, 5, 2) AS INT) AS `Month`,
. . . . . . . . . . . >   Operating_Airline,
. . . . . . . . . . . >   Operating_Airline_IATA_Code,
. . . . . . . . . . . >   Published_Airline,
. . . . . . . . . . . >   Published_Airline_IATA_Code,
. . . . . . . . . . . >   GEO_Summary,
. . . . . . . . . . . >   GEO_Region,
. . . . . . . . . . . >   Activity_Type_Code,
. . . . . . . . . . . >   Price_Category_Code,
. . . . . . . . . . . >   Terminal,
. . . . . . . . . . . >   Boarding_Area,
. . . . . . . . . . . >   CAST(Passenger_Count AS INT) AS Passenger_Count
. . . . . . . . . . . > FROM dfs.`/tmp/MonthlyPassengerData_200507_to_201506.csvh`;

改行コード (CRLF) に注意

この記事は Apache Drill Advent Calendar 2015 の7日目の記事です。

改行コードの取り扱いは、現時点での Drill の注意事項の一つです。Linux/Mac の環境で生成されたテキストデータであれば問題は起きませんが、Windows 環境で生成されたテキストデータ(もしくは Windows 環境で処理されることを前提としたデータ)を処理するときに、問題になるケースがあります。

例えば、Windows マシンで次のような CSV ファイルを作成します。

1,尾形金弥,オガタキンヤ,0582958162
2,三橋佑奈,ミハシユウナ,0987381855
3,鮫島憲司,サメジマケンジ,0982675562
4,宮野久寛,ミヤノヒサヒロ,0477707528

これを拡張子 .csv をつけて Drill で SELECT * してみると次のようになります。

0: jdbc:drill:zk=local> SELECT * FROM dfs.`/tmp/phone.csv`;
+----------------------------------------+
|                columns                 |
+----------------------------------------+
| ["1","尾形金弥","オガタキンヤ","0582958162\r"]   |
| ["2","三橋佑奈","ミハシユウナ","0987381855\r"]   |
| ["3","鮫島憲司","サメジマケンジ","0982675562\r"]  |
| ["4","宮野久寛","ミヤノヒサヒロ","0477707528\r"]  |
+----------------------------------------+
5 rows selected (0.325 seconds)

各行の配列の最後の要素に改行コード「\r」が含まれてしまっているのが見えます。さらに、配列要素をカラムに分解して表示してみると、

0: jdbc:drill:zk=local> SELECT columns[0] AS id,
. . . . . . . . . . . >        columns[1] AS name,
. . . . . . . . . . . >        columns[2] AS kana,
. . . . . . . . . . . >        columns[3] AS phone
. . . . . . . . . . . > FROM dfs.`/tmp/phone.csv`;
+-----+-------+-----------+--------------+
| id  | name  |   kana    |    phone     |
+-----+-------+-----------+--------------+
  |   | 尾形金弥  | オガタキンヤ    | 0582958162 ← 右端の「|」が無くなって左端のカラムを上書きしている
  |   | 三橋佑奈  | ミハシユウナ    | 0987381855
  |   | 鮫島憲司  | サメジマケンジ   | 0982675562
  |   | 宮野久寛  | ミヤノヒサヒロ   | 0477707528
+-----+-------+-----------+--------------+
5 rows selected (0.667 seconds)

このように改行(CR)のせいで表示がおかしくなる上、次のようなクエリは期待した結果を返さなくなってしまいます。

0: jdbc:drill:zk=local> SELECT columns[1] AS name
. . . . . . . . . . . > FROM dfs.`/tmp/phone.csv`
. . . . . . . . . . . > WHERE columns[3] LIKE '0987381855%';
+-------+
| name  |
+-------+
+-------+
No rows selected (0.373 seconds)

原因は、Windows 環境では改行コードが「\r\n」にもかかわらず、Drill が「\n」のみを改行コードとして扱っているためです。これを回避する単純な方法は tr の利用でしょう。

$ tr -d '\r' < /tmp/phone.csv > /tmp/phone_modified.csv

Drill コミュニティでもこの問題は認識されていて、修正されるまでにはそれほど長くはかからないはずです。

[DRILL-3149] TextReader should support multibyte line delimiters

[DRILL-3726] Drill is not properly interpreting CRLF (0d0a). CR gets read as content.

ただ、それまではちょっと注意が必要なので、念のため。