CSV ファイルを Parquet ファイルに変換してクエリを高速化

この記事は Apache Drill Advent Calendar 2015 の8日目の記事です。 

Apache Drill では Apache Parquet という大規模データの分析に適したデータフォーマットを利用することができます。Row-oriented フォーマットにカテゴリ分けされる CSV、TSV といったテキストファイルや伝統的なリレーショナルデータベースのテーブルでは、データは行方向に沿って格納されますが、Column-oriented フォーマットにカテゴリ分けされる Parquet、ORC といった形式のデータは列方向に沿って格納されます。

f:id:nagixx:20151209000303p:plain

データ分析用途には Column-oriented フォーマットが向いているとよく言われますが、データ分析では特定の列の値を集計したり、特定の列の条件を元にフィルタリングやジョインが行われることが多いため、列方向にデータが連続して格納されていると必要なデータのみを効率的に読み込むことができる、ということがその理由です。また、列方向には同じ種類のデータが並んでいるため、圧縮アルゴリズムも効きやすくなります。

必要なスキャン範囲を絞ることを目的として列方向にデータを分割して格納することを垂直パーティショニングと言いますが、Parquet ではさらに内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っています。これらの組み合わせにより、クエリの高速化を実現しています。

f:id:nagixx:20151207133252p:plain

さて、Drill で Parquet を扱うための方法ですが、ここからダウンロードした次のような CSV ファイルがあるとします。

$ cat /tmp/MonthlyPassengerData_200507_to_201506.csv
Activity Period,Operating Airline,Operating Airline IATA Code,Published Airline,Published Airline IATA Code,GEO Summary,GEO Region,Activity Type Code,Price Category Code,Terminal,Boarding Area,Passenger Count
200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Deplaned,Low Fare,Terminal 1,B,27271
200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Enplaned,Low Fare,Terminal 1,B,29131
200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Thru / Transit,Low Fare,Terminal 1,B,5415
200507,Air Canada ,AC,Air Canada ,AC,International,Canada,Deplaned,Other,Terminal 1,B,35156

まずヘッダ行の空白をアンダースコアで置換し(Drill は空白を含むカラム名を扱えない)、tr で余分な改行コード(CR)を取り除いた後で、ヘッダ行をカラム名として利用するために、拡張子を csvh に変更します。

$ sed -e '1s/ /_/g' /tmp/MonthlyPassengerData_200507_to_201506.csv | tr -d '\r' > /tmp/MonthlyPassengerData_200507_to_201506.csvh

そして、これを Parquet ファイルに変換するには、store.format プロパティに parquet を指定した状態で  CREATE TABLE AS SELECT(通称 CTAS)を使用します。

$ apache-drill-1.3.0/bin/drill-embedded
0: jdbc:drill:zk=local> ALTER SESSION SET `store.format` = 'parquet';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+
1 row selected (0.196 seconds)
0: jdbc:drill:zk=local> CREATE TABLE dfs.tmp.`/airport_data/` AS
SELECT * FROM dfs.`/tmp/MonthlyPassengerData_200507_to_201506.csvh`
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 13901                      |
+-----------+----------------------------+
1 row selected (1.111 seconds)

これだけ。簡単ですね。これで、/tmp/airport_data/ ディレクトリの下に Parquet フォーマットのファイルが保存されます。クエリを実行するには、データソースとしてこのディレクトリを指定します。

0: jdbc:drill:zk=local> SELECT Activity_Period, Operating_Airline, Passenger_Count
. . . . . . . . . . . > FROM dfs.tmp.`/airport_data/`
. . . . . . . . . . . > WHERE CAST(Passenger_Count AS INT) < 5;
+------------------+-----------------------------------+------------------+
| Activity_Period  |         Operating_Airline         | Passenger_Count  |
+------------------+-----------------------------------+------------------+
| 200610           | United Airlines - Pre 07/01/2013  | 2                |
| 200611           | Ameriflight                       | 1                |
| 200611           | Ameriflight                       | 1                |
...

あと、上記の CTAS クエリによる変換だと、元の CSV ファイルから読み込む時にすべてのカラムが VARCHAR 型として扱われてしまうので、後で直接集計などをしたい場合には、次のように Parquet に変換するタイミングでカラムごとにデータ型を指定しておきましょう。

0: jdbc:drill:zk=local> CREATE TABLE dfs.tmp.`/airport_data/` AS SELECT
. . . . . . . . . . . >   CAST(SUBSTR(Activity_Period, 1, 4) AS INT) AS `Year`,
. . . . . . . . . . . >   CAST(SUBSTR(Activity_Period, 5, 2) AS INT) AS `Month`,
. . . . . . . . . . . >   Operating_Airline,
. . . . . . . . . . . >   Operating_Airline_IATA_Code,
. . . . . . . . . . . >   Published_Airline,
. . . . . . . . . . . >   Published_Airline_IATA_Code,
. . . . . . . . . . . >   GEO_Summary,
. . . . . . . . . . . >   GEO_Region,
. . . . . . . . . . . >   Activity_Type_Code,
. . . . . . . . . . . >   Price_Category_Code,
. . . . . . . . . . . >   Terminal,
. . . . . . . . . . . >   Boarding_Area,
. . . . . . . . . . . >   CAST(Passenger_Count AS INT) AS Passenger_Count
. . . . . . . . . . . > FROM dfs.`/tmp/MonthlyPassengerData_200507_to_201506.csvh`;