Apache Drillで整数型と浮動小数点型が混じったJSONデータを読む時の注意

こんな感じのJSONデータがあるとします。気温を記録しているセンサーログデータ的なものですね。

$ cat /tmp/sensor.json
{
  "data":[
    {"sensor_id":15, "timestamp":"2015-10-29 08:00:00.004", "temperature":14.8},
    {"sensor_id":15, "timestamp":"2015-10-29 08:05:00.011", "temperature":14.9},
    {"sensor_id":15, "timestamp":"2015-10-29 08:10:00.002", "temperature":15},
    {"sensor_id":15, "timestamp":"2015-10-29 08:15:00.012", "temperature":15.2},
    {"sensor_id":15, "timestamp":"2015-10-29 08:20:00.009", "temperature":15.3}
  ]
}

これを単純にDrillでSELECTしてみようとすると、こんなエラーになってしまいます。

$ apache-drill-1.2.0/bin/drill-embedded
0: jdbc:drill:zk=local> SELECT * FROM dfs.`/tmp/sensor.json`;
Error: DATA_READ ERROR: You tried to write a BigInt type when you are using a ValueWriter of type NullableFloat8WriterImpl.

File  /tmp/sensor.json
Record  1
Line  5
Column  78
Field  temperature
Fragment 0:0

[Error Id: e958b13d-25c8-409c-a7d4-51b4359f40e6 on mbp:31010] (state=,code=0)

「temperature」というフィールドでエラーがあるというのがわかるのですが、これは8ビット浮動小数点型のWriterを値の内部書き込みに使っていたのに、8ビット整数型を書き込もうとしたことが原因です。次のApache Drillのドキュメントを見てみましょう。

Apache Drill - JSON Data Model
https://drill.apache.org/docs/json-data-model/

By default, Drill does not support JSON lists of different types. For example, JSON does not enforce types or distinguish between integers and floating point values. When reading numerical values from a JSON file, Drill distinguishes integers from floating point numbers by the presence or lack of a decimal point. If some numbers in a JSON map or array appear with and without a decimal point, such as 0 and 0.0, Drill throws a schema change error.

Drillでは小数点の有無によって整数型と浮動小数点型を区別しますが、JSONマップや配列内で異なる型の混在を許していないため、異なる型を読み込もうとした時にスキーマが変わった!とエラーを投げてしまうわけです。

これの対策の一つとして、Drillの「store.json.read_numbers_as_double」プロパティをtrueにすることで、すべての数値をDOUBLE型で読み込む方法があります。

0: jdbc:drill:zk=local> ALTER SESSION SET `store.json.read_numbers_as_double` = true;
+-------+---------------------------------------------+
|  ok   |                   summary                   |
+-------+---------------------------------------------+
| true  | store.json.read_numbers_as_double updated.  |
+-------+---------------------------------------------+
1 row selected (0.106 seconds)
0: jdbc:drill:zk=local> SELECT * FROM dfs.`/tmp/sensor.json`;
+------+
| data |
+------+
| [{"sensor_id":15.0,"timestamp":"2015-10-29 08:00:00.004","temperature":14.8},{"sensor_id":15.0,"timestamp":"2015-10-29 08:05:00.011","temperature":14.9},{"sensor_id":15.0,"timestamp":"2015-10-29 08:10:00.002","temperature":15.0},{"sensor_id":15.0,"timestamp":"2015-10-29 08:15:00.012","temperature":15.2},{"sensor_id":15.0,"timestamp":"2015-10-29 08:20:00.009","temperature":15.3}] |
+------+
1 row selected (0.115 seconds)

エラーは無くなりました。JSONドキュメント全体が1フィールドに詰め込まれてしまっているので、次のように展開しましょう。

0: jdbc:drill:zk=local> SELECT
. . . . . . . . . . . >   t.data.sensor_id sensor_id,
. . . . . . . . . . . >   t.data.`timestamp` `timestamp`,
. . . . . . . . . . . >   t.data.temperature temperature
. . . . . . . . . . . > FROM (
. . . . . . . . . . . >   SELECT FLATTEN(data) data FROM dfs.`/tmp/sensor.json`
. . . . . . . . . . . > ) t;
+------------+--------------------------+--------------+
| sensor_id  |        timestamp         | temperature  |
+------------+--------------------------+--------------+
| 15.0       | 2015-10-29 08:00:00.004  | 14.8         |
| 15.0       | 2015-10-29 08:05:00.011  | 14.9         |
| 15.0       | 2015-10-29 08:10:00.002  | 15.0         |
| 15.0       | 2015-10-29 08:15:00.012  | 15.2         |
| 15.0       | 2015-10-29 08:20:00.009  | 15.3         |
+------------+--------------------------+--------------+
5 rows selected (0.143 seconds)

ただ、これ本来整数であるべきsensor_idまで浮動小数点型になってしまってかっこ悪いですね・・。CAST関数を使えば型変換はできるのですが。

0: jdbc:drill:zk=local> SELECT
. . . . . . . . . . . >   CAST(t.data.sensor_id AS INT) sensor_id,
. . . . . . . . . . . >   CAST(t.data.`timestamp` AS TIMESTAMP) `timestamp`,
. . . . . . . . . . . >   t.data.temperature temperature
. . . . . . . . . . . > FROM (
. . . . . . . . . . . >   SELECT FLATTEN(data) data FROM dfs.`/tmp/sensor.json`
. . . . . . . . . . . > ) t;
+------------+--------------------------+--------------+
| sensor_id  |        timestamp         | temperature  |
+------------+--------------------------+--------------+
| 15         | 2015-10-29 08:00:00.004  | 14.8         |
| 15         | 2015-10-29 08:05:00.011  | 14.9         |
| 15         | 2015-10-29 08:10:00.002  | 15.0         |
| 15         | 2015-10-29 08:15:00.012  | 15.2         |
| 15         | 2015-10-29 08:20:00.009  | 15.3         |
+------------+--------------------------+--------------+
5 rows selected (0.337 seconds)

それから別の対策としては、「store.json.all_text_mode」プロパティをtrueにすることで、すべての値を文字列型で読み込む方法もあります。

0: jdbc:drill:zk=local> ALTER SESSION SET `store.json.all_text_mode` = true;
+-------+------------------------------------+
|  ok   |              summary               |
+-------+------------------------------------+
| true  | store.json.all_text_mode updated.  |
+-------+------------------------------------+
1 row selected (0.089 seconds)
0: jdbc:drill:zk=local> SELECT
. . . . . . . . . . . >   t.data.sensor_id sensor_id,
. . . . . . . . . . . >   t.data.`timestamp` `timestamp`,
. . . . . . . . . . . >   t.data.temperature temperature
. . . . . . . . . . . > FROM (
. . . . . . . . . . . >   SELECT FLATTEN(data) data FROM dfs.`/tmp/sensor.json`
. . . . . . . . . . . > ) t;
+------------+--------------------------+--------------+
| sensor_id  |        timestamp         | temperature  |
+------------+--------------------------+--------------+
| 15         | 2015-10-29 08:00:00.004  | 14.8         |
| 15         | 2015-10-29 08:05:00.011  | 14.9         |
| 15         | 2015-10-29 08:10:00.002  | 15           |
| 15         | 2015-10-29 08:15:00.012  | 15.2         |
| 15         | 2015-10-29 08:20:00.009  | 15.3         |
+------------+--------------------------+--------------+
5 rows selected (0.191 seconds)

集計などの操作をしないのであれば、このように値を解釈せずにそのまま読み込む方がシンプルで間違いがないでしょう。もし気温の平均値を求める場合、このまま集計関数を適用すると次のようにエラーになってしまいますので、

0: jdbc:drill:zk=local> SELECT
. . . . . . . . . . . >   t.data.sensor_id sensor_id,
. . . . . . . . . . . >   AVG(t.data.temperature) temperature
. . . . . . . . . . . > FROM (
. . . . . . . . . . . >   SELECT FLATTEN(data) data FROM dfs.`/tmp/sensor.json`
. . . . . . . . . . . > ) t
. . . . . . . . . . . > GROUP BY t.data.sensor_id;
Error: SYSTEM ERROR: SchemaChangeException: Failure while trying to materialize incoming schema.  Errors:
 
Error in expression at index -1.  Error: Missing function implementation: [castINT(BIT-OPTIONAL)].  Full expression: --UNKNOWN EXPRESSION--..

Fragment 0:0

[Error Id: 2626b393-96d8-4d6a-b169-5d787ee57c8d on mbp:31010] (state=,code=0)

集計対象のフィールドはCAST関数を使って浮動小数点型に変換すればうまくいきます。

0: jdbc:drill:zk=local> SELECT
. . . . . . . . . . . >   t.data.sensor_id sensor_id,
. . . . . . . . . . . >   AVG(CAST(t.data.temperature AS DOUBLE)) temperature
. . . . . . . . . . . > FROM (
. . . . . . . . . . . >   SELECT FLATTEN(data) data FROM dfs.`/tmp/sensor.json`
. . . . . . . . . . . > ) t
. . . . . . . . . . . > GROUP BY t.data.sensor_id;
+------------+---------------------+
| sensor_id  |     temperature     |
+------------+---------------------+
| 15         | 15.040000000000001  |
+------------+---------------------+
1 row selected (1.376 seconds)