Drawing a real-time Bitcoin chart using Chart.js

(*日本語の記事はこちら)

Chart.js is a popular JavaScript chart library that enables to create dynamic, beautiful charts easily. I recently made chartjs-plugin-streaming, a Chart.js plugin for live streaming data with the auto-scroll feature. It is suitable for IoT-related use cases such as sensor data monitoring, and when I was looking for some real examples of streaming data, I noticed that it is also good for displaying real-time digital currency trading data that the digital currency exchanges provide. So, I just tried that out.

It is becoming more common for the exchanges to deliver trading data efficiently using WebSocket. Some of them require authentication, but in this article, let's create trading charts for the exchanges that provide public API without authentication.

First example: Bitfinex WebSocket API

First, we need to include the following required libraries.

Put the following tags between <head> and </head>.

<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.18.1/moment.js"></script>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.6.0/Chart.js"></script>
<script type="text/javascript" src="https://github.com/nagix/chartjs-plugin-streaming/releases/download/v1.1.0/chartjs-plugin-streaming.js"></script>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/pusher/4.1.0/pusher.js"></script>

Next, let's add a canvas to the page. Include an id so that you can identify the chart later.

<canvas id="Bitfinex"></canvas>

Then, prepare an object for buffering data, which is buf. Set the array that has two empty array elements to the property with the same name as canvas's id. The first array is for 'buy' prices and the second is for 'sell' prices.

<script type="text/javascript">
var buf = {};
buf['Bitfinex'] = [[], []];
</script>

Now, let's access the real data. We are going to collect real-time trading data using WebSocket from Bitfinex in Hong Kong, which is one of the largest exchanges by BTC/USD trading volume. You can subscribe trading data of the Bitcoin/US dollar pair by sending a request like below to the URI wss://api.bitfinex.com/ws provided by Bitfinex. (See Bitfinex WebSocket API for details)

{
    "event": "subscribe", // subscribe request
    "channel": "trades",  // trade data
    "pair": "BTCUSD"      // Bitcoin/US dollar
}

The data you will receive through the callback function is like this.

[
    5,             // 0: channel ID
    'te',          // 1: message type
    '1234-BTCUSD', // 2: sequence ID
    1443659698,    // 3: timestamp
    236.42,        // 4: price
    0.49064538     // 5: amount bought (positive) or sold (negative)
]

As we only need the trade type, the timestamp (X-axis) and the price (Y-axis), the filtered data is stored into the buffer on every message receipt using the following code.

var ws = new WebSocket('wss://api.bitfinex.com/ws/');
ws.onopen = function() {
    ws.send(JSON.stringify({      // send subscribe request
        "event": "subscribe",
        "channel": "trades",
        "pair": "BTCUSD"
    }));
};
ws.onmessage = function(msg) {     // callback on message receipt
    var response = JSON.parse(msg.data);
    if (response[1] === 'te') {    // Only 'te' message type is needed
        buf['Bitfinex'][response[5] > 0 ? 0 : 1].push({
            x: response[3] * 1000, // timestamp in milliseconds
            y: response[4]         // price in US dollar
        });
    }
}

Finally, we need to configure the chart. Regarding the details of the chart customization, please refer to the Chart.js official documentation. The key points here are that the 'realtime' scale that the chartjs-plugin-streaming plugin provides are set to the X axis, and that the entire data stored in the buffer is added to the chart in the onRefresh callback function that is called at a regular interval (every second by default).

var id = 'Bitfinex';
var ctx = document.getElementById(id).getContext('2d');
var chart = new Chart(ctx, {
    type: 'line',
    data: {
        datasets: [{
            data: [],
            label: 'Buy',                     // 'buy' price data
            borderColor: 'rgb(255, 99, 132)', // line color
            backgroundColor: 'rgba(255, 99, 132, 0.5)', // fill color
            fill: false,                      // no fill
            lineTension: 0                    // straight line
        }, {
            data: [],
            label: 'Sell',                    // 'sell' price data
            borderColor: 'rgb(54, 162, 235)', // line color
            backgroundColor: 'rgba(54, 162, 235, 0.5)', // fill color
            fill: false,                      // no fill
            lineTension: 0                    // straight line
        }]
    },
    options: {
        title: {
            text: 'BTC/USD (' + id + ')', // chart title
            display: true
        },
        scales: {
            xAxes: [{
                type: 'realtime' // auto-scroll on X axis
            }]
        },
        plugins: {
            streaming: {
                duration: 300000, // display data for the latest 300000ms (5 mins)
                onRefresh: function(chart) { // callback on chart update interval
                    Array.prototype.push.apply(
                        chart.data.datasets[0].data, buf[id][0]
                    );            // add 'buy' price data to chart
                    Array.prototype.push.apply(
                        chart.data.datasets[1].data, buf[id][1]
                    );            // add 'sell' price data to chart
                    buf[id] = [[], []]; // clear buffer
                }
            }
        }
    }
});

Below is the completed chart. You can see that the chart is scrolling from the right to the left slowly.

Bitfinex

Bitstamp WebSocket API

The next example is Bitstamp, an exchange in UK, which is the central trading hub in Europe. Bitstamp uses Pusher, a Pub/Sub messaging library for real time WebSocket streaming. Therefore, the data subscription code is much simpler. (See Bitstamp WebSocket API for details)

The data you will receive through the callback function is like this.

{
    id: 17044523,            // trade unique ID
    amount: 1,               // amount
    price: 2496.21,          // price
    type: 1,                 // trade type (0: buy, 1: sell)
    timestamp: "1499472674", // timestamp
    buy_order_id: 47485421,  //	buy order ID
    sell_order_id: 47485426  //	sell order ID
}

Below is the code that receives data. It also takes only the trade type, the timestamp and the price.

buf['Bitstamp'] = [[], []]; // prepare buffer
var pusher = new Pusher('de504dc5763aeef9ff52'); // Pusher key for Bitstamp
var channel = pusher.subscribe('live_trades'); // subscribe live trade data
channel.bind('trade', function (data) { // callback on message receipt
    buf['Bitstamp'][data.type].push({
        x: data.timestamp * 1000, // timestamp in milliseconds
        y: data.price             // price in US dollar
    });
});

The configuration of the chart is the same as above except for the id. Below is the completed chart.

Bitstamp

BTC-E WebSocket API

BTC-E is also a popular exchage located in Bulgaria. It uses Pusher for streaming as well. (See BTC-E WebSocket API for details)

Below is the data you will receive.

[
    [
        "buy",       // 0: trade type
        "2476.999",  // 1: price
        "0.08863539" // 2: amount
    ]
]

As timestamps are not included in case of BTC-E, set the current time using Date.now().

buf['BTC-E'] = [[], []]; // prepare buffer
var pusher = new Pusher('c354d4d129ee0faa5c92'); // Pusher key for BTC-E
var channel = pusher.subscribe('btc_usd.trades'); // subscribe BTC/USD trades
channel.bind('trades', function (dataset) { // callback on message receipt
    dataset.forEach(function(data) {
        buf['BTC-E'][data[0] === 'buy' ? 0 : 1].push({
            x: Date.now(), // timestamp in milliseconds
            y: data[1]     // price in US dollar
        });
    });
});

BTC-E

BitMEX WebSocket API

BitMEX is an exchange in Hong Kong, which is famous for high leverage trading. It provides a normal WebSocket API. Below are the request and the message to be received. (See BitMEX WebSocket API for details)

{
    "op": "subscribe", // subscribe request
    "args": [
        "trade:XBTUSD" // Bitcoin/US dollar
    ]
}
{
    table: "trade",
    action: "insert",
    data: [
        {
            timestamp: "2017-07-09T01:39:30.866Z", // timestamp
            symbol: "XBTUSD",         // currency pair symbol
            side: "Buy",              // trade type
            size: 34,                 // amount
            price: 2548.9,            // price
            tickDirection: "ZeroPlusTick", // tick direction
            trdMatchID: "34d6de97-5d54-3431-e505-ffc3bc8c58ef",
            grossValue: 2039284,      // gross value 
            homeNotional: 0.02039284, // notional in Bitcoin
            foreignNotional: 52       // notional in US dollar
        }
    ]
}

The code and chart are as follows.

buf['BitMEX'] = [[], []]; // prepare buffer
var ws = new WebSocket('wss://www.bitmex.com/realtime');
ws.onopen = function() {
    ws.send(JSON.stringify(    // send subscribe request
       "op": "subscribe",
       "args": [
            "trade:XBTUSD"
        ]
    }));
};
ws.onmessage = function(msg) { // callback on message receipt
    var response = JSON.parse(msg.data);
    response.data.forEach(function(data) {
        buf['BitMEX'][data.side === 'Buy' ? 0 : 1].push({
            x: data.timestamp, // timestamp
            y: data.price      // price in US dollar
        });
    });
}

BitMEX

CoinCheck WebSocket API

The last one is Japanese exchange, CoinCheck. Only BTC/Japanese-yen rates are delivered here, so let's display them. Below are the request and the message to be received. (See CoinCheck WebSocket API for details)

{
    "type": "subscribe",        // subscribe request
    "channel": "btc_jpy-trades" // Bitcoin/Japanese yen
}
[
    9856377,    // trade ID
    "btc_jpy",  // currency pair
    "289544.0", // price
    "0.0367",   // amount
    "sell"      // trade type
]

The code and chart are as follows.

buf['CoinCheck'] = [[], []];
var ws = new WebSocket('wss://ws-api.coincheck.com/');
ws.onopen = function() {
    ws.send(JSON.stringify({        // send subscribe request
        "type": "subscribe",
        "channel": "btc_jpy-trades"
    }));
};
ws.onmessage = function(msg) { // callback on message receipt
    var response = JSON.parse(msg.data);
    buf['CoinCheck'][response[4] === 'buy' ? 0 : 1].push({
        x: Date.now(), // timestamp in milliseconds
        y: response[2] // price in Japanese yen
    });
}

CoinCheck WebSocket API

There are many other digital currency exchanges that provide API to deliver real-time trading data. The API and data format are slightly different each other, but you can draw a chart with minimum modification. I hope this article helps you try out for yourself.

Bitcoin のリアルタイムチャートを Chart.js で表示する

(*English translation is here)

Chart.js は動的で美しいチャートを手軽に作ることができるポピュラーな JavaScript ライブラリです。先日、リアルタイムストリーミングデータの表示に便利な自動スクロール機能を実装した chartjs-plugin-streaming プラグインを作りました。センサーデータのモニタリング等の IoT 関連の用途に最適ですが、何かリアルなストリーミングデータないかなー、と探していたところ、仮想通貨の取引所が配信している取引価格の表示にもぴったりな感じなので、早速試してみました。

最近は WebSocket を使って効率的にストリーミング配信を行う取引所も増えてきており、中には認証を必要とする場合もあるのですが、本記事では特に認証の不要なパブリック API を提供している取引所の取引チャートを作ってみましょう。

最初の例: Bitfinex WebSocket API

まずは必要なライブラリを組み込みます。

下記を<head>〜</head>にでも書いておきます。

<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.18.1/moment.js"></script>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.6.0/Chart.js"></script>
<script type="text/javascript" src="https://github.com/nagix/chartjs-plugin-streaming/releases/download/v1.2.0/chartjs-plugin-streaming.js"></script>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/pusher/4.1.0/pusher.js"></script>

次に、ページに Canvas を設置します。チャートを区別できるように、id をつけておきます。

<canvas id="Bitfinex"></canvas>

そして、データをバッファリングしておくためのオブジェクト buf を用意します。先ほど設置した Canvas の id と同じ名前のプロパティに、2つの空の配列要素を持つ配列を用意しておきます。1つ目の要素は買い値用、2つ目は売り値用です。

<script type="text/javascript">
var buf = {};
buf['Bitfinex'] = [[], []];
</script>

いよいよ実際のデータを取りに行く番です。WebSocket を使って、対米ドル取引で最大手の取引所 Bitfinex(香港)のリアルタイム取引データを取得します。Bitfinex が用意している URI wss://api.bitfinex.com/ws に次のような内容のリクエストを送信することで、ビットコイン/米ドルのペアの取引データを購読(Subscribe)することができます。(Bitfinex WebSocket API の詳細はこちら

{
    "event": "subscribe", // 購読リクエスト
    "channel": "trades",  // 取引データ
    "pair": "BTCUSD"      // ビットコイン/米ドル
}

コールバック関数を介して取得できるデータは次のような形式です。

[
    5,             // 0: チャンネル ID
    'te',          // 1: メッセージタイプ
    '1234-BTCUSD', // 2: シーケンス ID
    1443659698,    // 3: タイムスタンプ
    236.42,        // 4: 価格
    0.49064538     // 5: 取引量(買いならプラス、売りならマイナス)
]

必要なデータは売り買いの種別、タイムスタンプ(X軸)、価格(Y軸)だけなので、次のようなコードで更新があるたびにバッファにデータを追加していきます。

var ws = new WebSocket('wss://api.bitfinex.com/ws/');
ws.onopen = function() {
    ws.send(JSON.stringify({      // 購読リクエストを送信
        "event": "subscribe",
        "channel": "trades",
        "pair": "BTCUSD"
    }));
};
ws.onmessage = function(msg) {     // メッセージ更新時のコールバック
    var response = JSON.parse(msg.data);
    if (response[1] === 'te') {    // メッセージタイプ 'te' だけを見る
        buf['Bitfinex'][response[5] > 0 ? 0 : 1].push({
            x: response[3] * 1000, // タイムスタンプ(ミリ秒)
            y: response[4]         // 価格(米ドル)
        });
    }
}

最後にチャートの設定です。Chart.js のチャートのカスタマイズについては公式ドキュメントを参考にしてもらうとして、ここでのポイントは chartjs-plugin-streaming プラグインにより追加される 'realtime' タイプの目盛りを X 軸に設定していることと、定期的に(デフォルトでは1秒に1回)呼び出される onRefresh コールバック関数の中で、バッファに溜まっているデータを丸々チャートに追加しているところです。

var id = 'Bitfinex';
var ctx = document.getElementById(id).getContext('2d');
var chart = new Chart(ctx, {
    type: 'line',
    data: {
        datasets: [{
            data: [],
            label: 'Buy',                     // 買い取引データ
            borderColor: 'rgb(255, 99, 132)', // 線の色
            backgroundColor: 'rgba(255, 99, 132, 0.5)', // 塗りの色
            fill: false,                      // 塗りつぶさない
            lineTension: 0                    // 直線
        }, {
            data: [],
            label: 'Sell',                    // 売り取引データ
            borderColor: 'rgb(54, 162, 235)', // 線の色
            backgroundColor: 'rgba(54, 162, 235, 0.5)', // 塗りの色
            fill: false,                      // 塗りつぶさない
            lineTension: 0                    // 直線
        }]
    },
    options: {
        title: {
            text: 'BTC/USD (' + id + ')', // チャートタイトル
            display: true
        },
        scales: {
            xAxes: [{
                type: 'realtime' // X軸に沿ってスクロール
            }]
        },
        plugins: {
            streaming: {
                duration: 300000, // 300000ミリ秒(5分)のデータを表示
                onRefresh: function(chart) { // データ更新用コールバック
                    Array.prototype.push.apply(
                        chart.data.datasets[0].data, buf[id][0]
                    );            // 買い取引データをチャートに追加
                    Array.prototype.push.apply(
                        chart.data.datasets[1].data, buf[id][1]
                    );            // 売り取引データをチャートに追加
                    buf[id] = [[], []]; // バッファをクリア
                }
            }
        }
    }
});

出来上がったチャートはこのような感じ。ゆっくりスクロールしているのがわかるでしょうか。

Bitfinex

Bitstamp WebSocket API

次は欧州での取引の中心となっているイギリスの取引所 Bitstamp の例です。Bitstamp ではデータのストリーミングに Pub/Sub メッセージングライブラリ Pusher を利用しています。このため、データの取得コードはよりシンプルです。(Bitstamp WebSocket API の詳細はこちら

コールバック関数を介して取得できるデータは次のような形式です。

{
    id: 17044523,            // 取引固有 ID
    amount: 1,               // 取引量
    price: 2496.21,          // 価格
    type: 1,                 // 取引タイプ(0: 買い、1: 売り)
    timestamp: "1499472674", // タイムスタンプ
    buy_order_id: 47485421,  //	買い注文 ID
    sell_order_id: 47485426  //	売り注文 ID
}

データの取得コードは次の通りです。こちらも売り買いの種別、タイムスタンプ、価格のみを見ています。

buf['Bitstamp'] = [[], []]; // バッファを用意
var pusher = new Pusher('de504dc5763aeef9ff52'); // Bitstamp 専用のキーを指定
var channel = pusher.subscribe('live_trades'); // ライブ取引データを購読
channel.bind('trade', function (data) { // データ更新時のコールバック
    buf['Bitstamp'][data.type].push({
        x: data.timestamp * 1000, // タイムスタンプ(ミリ秒)
        y: data.price             // 価格(米ドル)
    });
});

チャートの設定は id の部分を変えるだけで全く同じです。完成したチャートはこの通り。

Bitstamp

BTC-E WebSocket API

続いてこちらも有名なブルガリアの取引所 BTC-E。BTC-E でも Pusher を使った配信が行われています。(BTC-E WebSocket API の詳細はこちら

データの形式は次の通り。

[
    [
        "buy",       // 0: 取引タイプ
        "2476.999",  // 1: 価格
        "0.08863539" // 2: 取引量
    ]
]

BTC-E ではタイムスタンプが流れてこないので、Date.now() で現在の時刻をセットしています。

buf['BTC-E'] = [[], []]; // バッファを用意
var pusher = new Pusher('c354d4d129ee0faa5c92'); // BTC-E 専用のキー
var channel = pusher.subscribe('btc_usd.trades'); // 対米ドル取引データを購読
channel.bind('trades', function (dataset) { // データ更新時のコールバック
    dataset.forEach(function(data) {
        buf['BTC-E'][data[0] === 'buy' ? 0 : 1].push({
            x: Date.now(), // タイムスタンプ(ミリ秒)
            y: data[1]     // 価格(米ドル)
        });
    });
});

BTC-E

BitMEX WebSocket API

ハイレバレッジ取引で有名な香港の BitMEX です。ここは素の WebSocket API です。リクエストとデータの形式は次の通りです。(BitMEX WebSocket API の詳細はこちら

{
    "op": "subscribe", // 購読リクエスト
    "args": [
        "trade:XBTUSD" // ビットコイン/米ドル
    ]
}
{
    table: "trade",
    action: "insert",
    data: [
        {
            timestamp: "2017-07-09T01:39:30.866Z", // タイムスタンプ
            symbol: "XBTUSD",   // 通貨ペアシンボル
            side: "Buy",        // 取引タイプ
            size: 34,           // 取引量
            price: 2548.9,      // 価格
            tickDirection: "ZeroPlusTick", // Tick の方向
            trdMatchID: "34d6de97-5d54-3431-e505-ffc3bc8c58ef",
            grossValue: 2039284,      // 総価値
            homeNotional: 0.02039284, // 想定元本(ビットコイン)
            foreignNotional: 52       // 想定元本(米ドル)
        }
    ]
}

コードとチャートはこのようになります。

buf['BitMEX'] = [[], []]; // バッファを用意
var ws = new WebSocket('wss://www.bitmex.com/realtime');
ws.onopen = function() {
    ws.send(JSON.stringify(    // 購読リクエストを送信
       "op": "subscribe",
       "args": [
            "trade:XBTUSD"
        ]
    }));
};
ws.onmessage = function(msg) { // メッセージ更新時のコールバック
    var response = JSON.parse(msg.data);
    response.data.forEach(function(data) {
        buf['BitMEX'][data.side === 'Buy' ? 0 : 1].push({
            x: data.timestamp, // タイムスタンプ
            y: data.price      // 価格(米ドル)
        });
    });
}

BitMEX

CoinCheck WebSocket API

最後は日本の取引所 CoinCheck です。対日本円のデータのみが配信されているので、それを表示してみましょう。リクエストとデータの形式は次の通りです。(CoinCheck WebSocket API の詳細は こちら

{
    "type": "subscribe",        // 購読リクエスト
    "channel": "btc_jpy-trades" // ビットコイン/日本円
}
[
    9856377,    // 取引 ID
    "btc_jpy",  // 通貨ペア
    "289544.0", // 価格
    "0.0367",   // 取引量
    "sell"      // 取引タイプ
]

コードとチャートは次の通りです。

buf['CoinCheck'] = [[], []];
var ws = new WebSocket('wss://ws-api.coincheck.com/');
ws.onopen = function() {
    ws.send(JSON.stringify({        // 購読リクエストを送信
        "type": "subscribe",
        "channel": "btc_jpy-trades"
    }));
};
ws.onmessage = function(msg) { // メッセージ更新時のコールバック
    var response = JSON.parse(msg.data);
    buf['CoinCheck'][response[4] === 'buy' ? 0 : 1].push({
        x: Date.now(), // タイムスタンプ(ミリ秒)
        y: response[2] // 価格(日本円)
    });
}

CoinCheck

これ以外にも、多くの取引所がリアルタイムデータを配信する API を提供しています。それぞれ微妙に形式が異なりますが、最小限の変更でチャートの表示ができますので、ご興味をお持ちの方は参考にしてください。

HDFS Snapshot + distcp と MapR-FS Volume Mirroring の違い

MapR は HDFS の代わりに MapR-FS を使用している Hadoop ディストリビューションです。性能の向上信頼性の向上ランダムリードライト可能なNFSNoSQL データベースとの統合メッセージングキューとの統合、・・・と MapR-FS のメリットは挙げればきりがないのですが、HDFS API はそのまま利用できるため、すべての Hadoop アプリケーションやライブラリは違いを意識することなく動作します。

さて、Hadoop クラスタを運用する際に、データ更新を行う業務アプリケーションと、参照がメインの分析アプリケーション間で同じデータを共有したい、というケースはよくあると思います。ただし、分析アプリはデータセットの特定の時点の一貫性のあるスナップショットに対して処理を行うべきであるため、任意の時点で更新が発生する業務アプリのデータセットにそのままアクセスするわけにはいきません。また、分析アプリが業務アプリと同時に動作した場合、負荷の増大により業務アプリの処理時間に影響を与えてしまうリスクもあります。

このような場合に、 Hadoop では特定のディレクトリのスナップショットを取得する HDFS Snapshot 機能と、分散データコピーツール distcp を使い、物理的なコピーをクラスタ内、もしくは別クラスタに作成し、特定の時点のスナップショットの複製に対してアクセスを行うことで、上記の課題を解決する方法があります。

しかし MapR にはこれと同じことをより簡単に、より効率良く行なうための Volume Mirroring 機能があります。しかもこの機能は MapR の 2011 年の最初のリリースから存在しており、安定して運用に利用されてきた実績があります。

以下では、それぞれの動作の違いについて比較してみましょう。

HDFS Snapshot + distcp の場合

手順の詳細は上記の記事を見ていただければと思いますが、同じ例を使って説明します。まず、source ディレクトリに Data.txt が存在します。

f:id:nagixx:20160410152218j:plain

hdfs コマンドで source ディレクトリのスナップショットを作ります。 

f:id:nagixx:20160410152407j:plain

distcp ツールでスナップショット s1 を target ディレクトリにコピーします。このコマンドは MapReduce ジョブを起動し、クラスタ内で並列にデータコピーが行われます。

f:id:nagixx:20160410152516j:plain

hdfs コマンドで target ディレクトリのスナップショットを作ります。これで 1 世代目のスナップショットの複製が全体コピーで作成されました。

f:id:nagixx:20160410152846j:plain

次に source ディレクトリに Data2.txt が更新データとして書き込まれました。

f:id:nagixx:20160410153010j:plain

hdfs コマンドで source ディレクトリの 2 世代目のスナップショットを作ります。

f:id:nagixx:20160410153317j:plain

distcp ツールでコピーを行いますが、今度はスナップショット s1 と s2 の内容を比較し、ファイル間で更新があったものだけの差分コピーが行われます。

f:id:nagixx:20160410153449j:plain

hdfs コマンドで target ディレクトリのスナップショットを作ります。これで 2 世代目のスナップショットの複製が差分コピーで作成されました。3 世代目以降はこれの繰り返しです。

f:id:nagixx:20160410153802j:plain

MapR-FS Volume Mirroring の場合

上記のようなデータの差分更新のサイクルを回す場合、MapR-FS では Volume という論理的な管理単位を作成しておき、ファイルシステム上に配置しておくことで運用をシンプルにできます。

Volume Mirroring 機能では、コピーの単位は Volume 全体ですので、source 側、mirror 側それぞれにあらかじめ Volume を作成しておきます。Volume を作成するには次の管理コマンドを実行します。mirror 側には <source ボリューム名>@<source クラスタ名> の形でコピー元となる Volume を指定します。

$ maprcli volume create -name source -path /user/hadoop/source
$ maprcli volume create -name target -path /user/hadoop/target -source source@demo.mapr.com -type mirror

先ほどの例と同様に、はじめに source ディレクトリに Data.txt が存在します。

f:id:nagixx:20160410154632j:plain

さて、スナップショットを取りたいタイミングで、MapR の場合は次のコマンド 1 つを実行するだけで、Mirror Volume にスナップショットのコピーが作られるところまで完了します。シンプルですね。

$ maprcli volume mirror start -name target

内部では、まず source 側では一時的なスナップショット、target 側ではタイムスタンプのついたスナップショットが同時に作成されます。その後、source の一時的なスナップショットの内容が target ボリュームにコピーされます。ここでのポイントは、コピーはバックグラウンドで非同期に行われること、コピー処理は MapReduce を起動せず、MapR-FS プロセスが行うこと、target 側ではコピー途中のファイルは見えず、完了した時点で初めてアクセスできるようになることです。コピー完了時に、source 側の一時的なスナップショットは削除されます。

f:id:nagixx:20160410160242j:plain

次に source ディレクトリに Data2.txt が更新データとして書き込まれました。

f:id:nagixx:20160410172243j:plain

2 世代目のスナップショットを作成する場合も、1 世代目とまったく同じコマンドを実行します。シンプルですね。

$ maprcli volume mirror start -name target

両方の Volume 内にスナップショットが作られるところまでは同じですが、今度は前回のコピー時から差分のあったデータだけをコピーします。HDFS ではファイル単位でしか差分を比較していませんが、MapR-FS では 8KB のデータブロック単位で更新履歴を管理しているため、実際に変更のあったファイルの一部分のみのコピーが行われます。

第 3 世代以降もこれの繰り返しです。

f:id:nagixx:20160410161843j:plain

さらに、この Mirroring のコピーのタイミングはスケジューリング設定によって完全に自動化できるため(target 側に残るスナップショットの有効期限も設定できる)、マニュアル操作不要の運用が行えます。

MapR-FS だと何がいいのか

まとめると、MapR-FS Volume Mirroring では下記のようなメリットがあり、運用上の複数の課題の解決に役立ちます。

  • 運用に必要な手順が少ないため、運用設計及び実行の負担が減る
    • オペレーションミスのリスクも減らせる
  • HDFS + MapReduce という 2 つのレイヤにまたがる処理ではなく、MapR-FS で完結する処理であるためシンプルで効率的
    • コピーで MapReduce ジョブが起動しないため、クラスタのリソースの消費を最低限に押さえられる
    • 他の業務のアプリケーションへの影響が少ない
    • 障害時の内部的な挙動もシンプルになりリスクを減らせる
  • MapR-FS の粒度の細かいデータブロック管理を最大限に活用し、短時間でコピーを完了
    • 8KBデータブロック単位の差分管理
    • MapR-FS の圧縮、チェックサムが適用された状態で信頼性が高く効率の良いコピーが可能
  • アトミックな操作で一貫性のあるスナップショットを利用可能
    • HDFS のスナップショットは NameNode 上のメタデータを利用したイメージの取得であるため、構造上データそのものとの完全な一貫性が保証できない。一方、MapR-FS ではデータとメタデータが同期された一貫性のあるスナップショットを取得できる
    • MapR-FS Volume Mirroring のコピーはアトミックな動作であるため、バックグラウンドでコピー処理実行中でも、source 側、target 側双方で Point-in-Time の一貫性のあるデータセットへのアクセスができる

2016 年に向けた注目の新機能の開発状況

この記事は Apache Drill Advent Calendar 2015 の25日目の記事です。

2015年もあとわずか。今回は Drill の JIRA チケットや GitHub を眺めつつ、2016年にどんな新機能が出てきそうか興味のおもむくままにご紹介しましょう。

Cassandra ストレージプラグイン

[DRILL-92] Cassandra storage engine

初期パッチが出て議論が行われたものの、ここしばらく動きが中断しちゃっていますね。

How to Use Apache Drill with Cassandra - Stack Overflow

こちらの議論を見ると、Drill や Cassandra の開発が進んだ結果、当初のパッチがもう使えなくなってしまっているようです。誰かー。

Couchbase ストレージプラグイン

jacques-n/drill-couchbase-plugin · GitHub
drill/contrib/storage-couchbase at couchbase-storage-plugin · tgrall/drill · GitHub

2つの作りかけの実装があります。下の方は Couchbase の N1QL を利用しているようです。

Solr/Elasticsearch ストレージプラグイン

[DRILL-3585] Apache Solr as a storage plugin
[DRILL-3637] Elasticsearch storage plugin

Solr/Elasticsearch インデックスも構造はドキュメントデータベースみたいなものですからね。SQL でクエリを投げられれば便利です。

Kudu ストレージプラグイン

[DRILL-4241] Add Experimental Kudu plugin

高速な分析に最適化したストレージ Apache Kudu に対応するストレージプラグインです。

XML フォーマットのサポート

[DRILL-3878] Support XML Querying (selects/projections, no writing)

JSON にクエリができるなら XML もできるだろうということで。JSONRecordReader をフックしている感じでしょうか。

HTTPD ログフォーマットのサポート

[DRILL-3423] Add New HTTPD format plugin

HTTPD ログを変換なしに直接 SQL で処理できるというプラグインです。これもあると便利ですね。

Excel フォーマットのサポート

[DRILL-3738] Create StoragePlugin for Excel files (.xlsx or possibly .xls) - version 1 - read only.

Apache POI を使って Excel のファイルに対応させようという試みです。企業にはたくさんありますからねえ。

INSERT INTO TABLE サポート

[DRILL-3534] Insert into table support

前から課題にはあがっていると思うのですが、実装が進んでいるのかどうかはイマイチ不明です。

分析関数の追加

[DRILL-3962] Add support of ROLLUP, CUBE, GROUPING SETS, GROUPING, GROUPING_ID, GROUP_ID support

BI 分析ワークロードに欠かせない分析関数の追加です。Drill が内部で使っている Apache Calcite にはすでに実装があるので、Drill でサポートするのはそんなに困難ではないはずです。

地理空間データのクエリ

[DRILL-3914] Support geospatial queries

PostgreSQL 向け PostGIS のような感じの、地理空間データのクエリをサポート。内部で ESRI Geometry API for Java を使うのは定番のようです。すでに 1.3 で基本的な機能は master に取り込まれているっぽいです。

JDK 8 サポート

[DRILL-1491] Support for JDK 8

結構よく聞かれる JDK 8 のサポート。まだいくつかのサブタスクが残っているようです。JDK 7 のサポート切れからしばし経過する一方 JDK 8 の導入が進んでいるので、早い対応が望まれますね。

YARN サポート

[DRILL-1170] YARN support for Drill

負荷に合わせて動的に YARN コンテナを増減するというしくみを目指す取り組み。だけどもこれもまだ動きが無いような。

Drill のユーザ認証とインパーソネーション

この記事は Apache Drill Advent Calendar 2015 の24日目の記事です。

今回は、Drill のセキュリティを向上させる2つの機能、インパーソネーションとユーザ認証について紹介します。

インパーソネーション

Drill には、クライアントから要求されたアクションを、クライアント自身の権限で実行する「インパーソネーション(Impersonation)」機能があります。デフォルトでは無効になっていますが、設定ファイル conf/drill-override.conf を編集して有効にすることができます。

インパーソネーションが有効になっている場合、Drill はユーザーの認証情報(Credential)をファイルシステムに渡し、ファイルシステムはそのユーザがデータアクセスに対する適切な権限を持っているかのチェックを行います。逆にインパーソネーションが無効になっている場合、Drill はすべてのクライアントからの要求を、Drillbit サービスを起動したユーザの権限で実行することになります。Drillbit を起動するユーザは通常は特権ユーザですが、この場合クライアントのユーザごとの細かいアクセス制御ができないことになります。

インパーソネーションを有効にするには、次のように設定ファイルを編集し、Drillbit を再起動します。

$ vi conf/drill-override.conf
drill.exec: {
  cluster-id: "cluster_name",
  zk.connect: "<hostname>:<port>,<hostname>:<port>,<hostname>:<port>",
  impersonation: {
    enabled: true
  }
}

下記の表は、Drill のどの機能がインパーソネーションに対応しているかを示しています。

タイプ対応機能非対応の機能
クライアント SQLLine, ODBC, JDBC Drill Web コンソール, REST API
ストレージプラグイン ファイルシステム Hive, HBase
クエリ

インパーソネーションが有効のとき、設定はデータとメタデータ両方のアクセスに適用されます。例えば、SHOW SCHEMAS コマンドを発行した場合、Drill はログインしたユーザの権限でメタデータにアクセスを試みます。また、ワークスペースに対する SELECT クエリを発行した場合は、ログインしたユーザの権限でデータにアクセスを試みます。インパーソネーションが適用されるコマンドは次の通りです:

  • SHOW SCHEMAS
  • SHOW DATABASES
  • SHOW TABLES
  • CREATE TABLE AS SELECT
  • SELECT
  • CREATE VIEW
  • DROP VIEW
  • SHOW FILES

CREATE TABLE AS SELECT および CREATE VIEW コマンドを実行するには、ユーザはテーブルやビューを保存するディレクトリの書き込み権限を持っている必要があることに注意してください。

 

ユーザ認証

一方で、ユーザを正しく識別するためにはユーザ認証が欠かせません。Drill では Linux PAM(Pluggable Authentication Module)によるユーザ名/パスワードをベースにした認証をサポートしています。認証は JDBC/ODBC 接続を介して行われます。PAM はインストールされたどの PAM 認証エンティティにも対応するため、ローカル OS のパスワードファイルはもとより LDAP などにも幅広く対応します。

もしインパーソネーションが有効になっていれば、ユーザ認証と併用することで、どのユーザがどのファイルにアクセスできるかということを制御できることになり、Drill システム全体のセキュリティが向上します。

PAM を利用したユーザ認証を行うには、すべての Drill ノードでユーザのユーザ名、UID、パスワードが一致している必要があります。また、/etc/passwd を使う場合には、Drillbit を起動するユーザはすべてのノードで shadow グループに所属している必要があります。

それでは、PAM 認証の設定の手順を追っていきます。まず下記のサイトから Linux 用の JPam をダウンロードします。

http://sourceforge.net/projects/jpam/files/jpam/jpam-1.1/

tar.gz を展開し、libjpam.so を取り出してどこかのディレクトリに配置します(例えば /opt/pam)。そして conf/drill-env.sh を編集して libjpam.so があるディレクトリのパスを次のように指定します。

$ vi conf/drill-env.sh
export DRILLBIT_JAVA_OPTS="-Djava.library.path=/opt/pam/"

さらに conf/drill-override.conf に次の設定を加えます。

$ vi conf/drill-override.conf
drill.exec: {
  security.user.auth: {
    enabled: true,
    packages += "org.apache.drill.exec.rpc.user.security",
    impl: "pam",
    pam_profiles: [ "sudo", "login" ]
  } 
}

これで Drillbit を再起動すれば、ユーザ認証が有効になります。

sqlline を使うクライアントからアクセスするには、次のようにユーザ名(-n オプション)とパスワード(-p オプション)を指定します。

$ sqlline –u jdbc:drill:zk=10.10.11.112:5181 –n bob –p bobdrill

もしパスワードを隠したい場合には、sqlline に入った後に !connect コマンドで接続します。

$ sqlline
sqlline> !connect jdbc:drill:zk=localhost:2181
scan complete in 1385ms
Enter username for jdbc:drill:zk=localhost:2181: bob
Enter password for jdbc:drill:zk=localhost:2181: *************

最後に、Drill の特権ユーザについて説明を加えておきます。ユーザ認証を有効にした場合、Drill の特権ユーザのみに次のような操作が許されることになります。

  • ALTER SYSTEM コマンドによるシステムレベルのオプションの変更
  • REST API もしくは Web UI によるストレージプラグイン設定の変更
  • すべてのクエリプロファイルの閲覧
  • すべての実行中のクエリのキャンセル

特権ユーザは次のうちのいずれかです。

  • システムオプション security.admin.users で指定されたユーザ(ALTER SYSTEM にて変更可能)
  • システムオプション security.admin.user_groups で指定されたグループに所属するユーザ(ALTER SYSTEM にて変更可能)
  • Drillbit を起動したユーザ

Parquet のパーティショニングによる性能最適化

この記事は Apache Drill Advent Calendar 2015 の23日目の記事です。

Drill では Parquet フォーマットを使うことによって、パーティションプルーニングによる性能上のメリットを得ることができます。パーティションプルーニングとは、アクセスするパーティションを WHERE 句の条件により自動的に絞り込む機能で、これにより処理の効率化が期待されます。

パーティションプルーニング機能を使うには、まずテーブルをパーティションに分けておく必要があります。PARTITION BY 句をつけて CTAS(CREATE TABLE AS SELECT)操作を行うことでパーティション化したテーブルを作成します。

Parquet の書き込み処理では、まずパーティションキーによるソートが行われ、パーティションカラムに新しい値を見つける毎にファイルが作られます。Drill は同じディレクトリ内にパーティション毎のファイルを作りますが、1つのファイルには1つのパーティションキーの値しか含みません。ただし、1つのパーティションキーに対してファイルが複数になることはあります。

では、例を見てみましょう。元データは最近よく使う個人情報 CSV ファイルです。

$ head -n 3 personal_information_50m.csv 
1,碓井卓也,ウスイタクヤ,0,0984963105,takuya57105@rrqec.sq,889-0507,宮崎県,延岡市,旭ケ丘,2-17,,ミヤザキケン,ノベオカシ,アサヒガオカ,2-17,,1985/08/13,29,東京都,AB,836,"FBQv,9nv"
2,脇田勇三,ワキタユウゾウ,0,0554838431,yuuzou679@ydomu.ee,400-0121,山梨県,甲斐市,牛句,4-2-12,牛句ロイヤルパレス213,ヤマナシケン,カイシ,ウシク,4-2-12,ウシクロイヤルパレス213,1978/04/28,36,沖縄県,A,862,kp1fDUPQ
3,立川円香,タチカワマドカ,1,0221247688,madoka_tachikawa@uzppymocdm.za,980-0855,宮城県,仙台市青葉区,川内澱橋通,4-1-13,川内澱橋通コーポ303,ミヤギケン,センダイシアオバク,カワウチヨドミバシドオリ,4-1-13,カワウチヨドミバシドオリコーポ303,1990/01/29,24,鹿児島県,A,881,XtFKa9kL

これを CTAS で Parquet に変換します。この例では、都道府県を格納するカラム address1 をパーティションキーに指定しています。

$ apache-drill-1.4.0/bin/sqlline -u jdbc:drill:zk=node1:5181
0: jdbc:drill:zk=node1:5181> CREATE TABLE dfs.tmp.`/personal_information/`
. . . . . . . . . . . . . .> PARTITION BY (address1)
. . . . . . . . . . . . . .> AS SELECT
. . . . . . . . . . . . . .>   CAST(columns[0] AS INT) AS id,
. . . . . . . . . . . . . .>   columns[1] AS name,
. . . . . . . . . . . . . .>   CAST(columns[3] AS INT) AS sex,
. . . . . . . . . . . . . .>   columns[4] AS phone,
. . . . . . . . . . . . . .>   columns[5] AS email,
. . . . . . . . . . . . . .>   columns[6] AS zip,
. . . . . . . . . . . . . .>   columns[7] AS address1,
. . . . . . . . . . . . . .>   columns[8] AS address2,
. . . . . . . . . . . . . .>   columns[9] AS address3,
. . . . . . . . . . . . . .>   columns[10] AS address4,
. . . . . . . . . . . . . .>   columns[11] AS address5,
. . . . . . . . . . . . . .>   TO_DATE(columns[17], 'yyyy/MM/dd') AS birth_date
. . . . . . . . . . . . . .> FROM dfs.`/personal_information_50m.csv`;
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 1_4       | 5200254                    |
| 1_8       | 5200257                    |
| 1_5       | 5200239                    |
| 1_2       | 5200257                    |
| 1_7       | 5200248                    |
| 1_1       | 5277845                    |
| 1_3       | 6240297                    |
| 1_0       | 6240310                    |
| 1_6       | 6240293                    |
+-----------+----------------------------+
9 rows selected (557.927 seconds)

実行結果のテーブルを見ると、9つの Minor Fragment が並列にデータを書き出したことが読み取れます。出力先のディレクトリには以下のようにファイルができています。ファイル名先頭の「1_x」は書き出し処理を行った Minor Fragment の番号、次の「_yy」はパーティションの番号です(ここでは都道府県でパーティショニングしたので48個に分かれていますね)。

$ ls tmp/personal_information/
1_0_1.parquet   1_1_45.parquet  1_3_37.parquet  1_5_29.parquet  1_7_20.parquet
1_0_10.parquet  1_1_46.parquet  1_3_38.parquet  1_5_3.parquet   1_7_21.parquet
1_0_11.parquet  1_1_47.parquet  1_3_39.parquet  1_5_30.parquet  1_7_22.parquet
1_0_12.parquet  1_1_48.parquet  1_3_4.parquet   1_5_31.parquet  1_7_23.parquet
1_0_13.parquet  1_1_5.parquet   1_3_40.parquet  1_5_32.parquet  1_7_24.parquet
1_0_14.parquet  1_1_6.parquet   1_3_41.parquet  1_5_33.parquet  1_7_25.parquet
...

元の CSV と Parquet のファイルサイズを比べると、Parquet ファイルの圧縮効果が一目瞭然です。Parquet おそるべし!

$ du -sm personal_information_50m.csv tmp/personal_information/
12308	personal_information_50m.csv
3258	tmp/personal_information/

では WHERE 句の条件つきのクエリを実行してみます。

0: jdbc:drill:zk=node1:5181> SELECT name, address1, address2, address3, address4, address5, birth_date
. . . . . . . . . . . . . .> FROM dfs.tmp.`/personal_information/`
. . . . . . . . . . . . . .> WHERE address1 = _UTF16'神奈川県' AND
. . . . . . . . . . . . . .>       address2 = _UTF16'横須賀市' AND
. . . . . . . . . . . . . .>       birth_date > '1985-01-01'
. . . . . . . . . . . . . .> LIMIT 5;
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
|  name  | address1  | address2  | address3  | address4  |   address5    | birth_date  |
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
| 高見善次郎  | 神奈川県      | 横須賀市      | 馬堀海岸      | 2-5-10    |               | 1987-06-13  |
| 福田渚    | 神奈川県      | 横須賀市      | 船越町       | 2-12-8    | 船越町ステーション105  | 1986-11-24  |
| 辻清作    | 神奈川県      | 横須賀市      | 平和台       | 2-11      | 平和台テラス202     | 1985-09-20  |
| 河田果凛   | 神奈川県      | 横須賀市      | 久村        | 2-18      | ザ久村305        | 1991-08-23  |
| 尾上謙治   | 神奈川県      | 横須賀市      | グリーンハイツ   | 1-17-16   |               | 1992-04-05  |
+--------+-----------+-----------+-----------+-----------+---------------+-------------+
5 rows selected (31.381 seconds)

肝心なのは、実際にどんなプランが実行されたかです。EXPLAIN PLAN FOR もしくは Drill Web UI でクエリプランを見てみます。するとスキャン対象のファイルが「1_x_34.parquet」という名前の 9 つのみであることがわかります。「神奈川県」をキーとするデータは 34 番のパーティションに入っており、スキャン対象がきちんと絞り込まれていることを確認できました。

00-00    Screen
00-01      Project(name=[$0], address1=[$1], address2=[$2], address3=[$3], address4=[$4], address5=[$5], birth_date=[$6])
00-02        SelectionVectorRemover
00-03          Limit(fetch=[5])
00-04            UnionExchange
01-01              SelectionVectorRemover
01-02                Limit(fetch=[5])
01-03                  Project(name=[$3], address1=[$0], address2=[$1], address3=[$4], address4=[$5], address5=[$6], birth_date=[$2])
01-04                    SelectionVectorRemover
01-05                      Filter(condition=[AND(=($0, _UTF-16LE'神奈川県'), =($1, _UTF-16LE'横須賀市'), >($2, '1985-01-01'))])
01-06                        Project(address1=[$4], address2=[$6], birth_date=[$2], name=[$0], address3=[$5], address4=[$1], address5=[$3])
01-07                          Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tmp/personal_information/1_1_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_5_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_2_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_6_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_0_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_4_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_8_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_3_34.parquet], ReadEntryWithPath [path=/tmp/personal_information/1_7_34.parquet]], selectionRoot=maprfs:/tmp/personal_information, numFiles=9, usedMetadataFile=false, columns=[`address1`, `address2`, `birth_date`, `name`, `address3`, `address4`, `address5`]]])

以前の記事でも解説したように、Parquet は Column-oriented フォーマットであるため、各ファイルのスキャンにおいても SELECT 対象のカラムの部分に絞り込んで読み込みを行っています。圧縮効果によるファイルサイズの縮小とも合わせ、大きなパフォーマンス向上の効果が見込めるでしょう。

Drill Web UI のビジュアルなクエリプロファイル

この記事は Apache Drill Advent Calendar 2015 の22日目の記事です。

Drill のパフォーマンスチューニングに役立つ情報の一つはクエリプラン、そしてもう一つはクエリプロファイルです。今回はクエリプロファイルでどんな情報が見られるかを紹介していきましょう。

今回使うサンプルデータはこんな感じです。まずは 5,000万人分の個人情報が入った CSV ファイル約13GB(ダミー情報です)。

$ head -n 3 personal_information_50m.csv 
1,碓井卓也,ウスイタクヤ,0,0984963105,takuya57105@rrqec.sq,889-0507,宮崎県,延岡市,旭ケ丘,2-17,,ミヤザキケン,ノベオカシ,アサヒガオカ,2-17,,1985/08/13,29,東京都,AB,836,"FBQv,9nv"
2,脇田勇三,ワキタユウゾウ,0,0554838431,yuuzou679@ydomu.ee,400-0121,山梨県,甲斐市,牛句,4-2-12,牛句ロイヤルパレス213,ヤマナシケン,カイシ,ウシク,4-2-12,ウシクロイヤルパレス213,1978/04/28,36,沖縄県,A,862,kp1fDUPQ
3,立川円香,タチカワマドカ,1,0221247688,madoka_tachikawa@uzppymocdm.za,980-0855,宮城県,仙台市青葉区,川内澱橋通,4-1-13,川内澱橋通コーポ303,ミヤギケン,センダイシアオバク,カワウチヨドミバシドオリ,4-1-13,カワウチヨドミバシドオリコーポ303,1990/01/29,24,鹿児島県,A,881,XtFKa9kL

そしてゆるキャラ情報が入った JSON ファイル約1KB。

$ cat yuru.json 
{
  "address1":"熊本県",
  "address2":"",
  "character":"くまモン",
}
{
  "address1":"千葉県",
  "address2":"船橋市",
  "character":"ふなっしー",
}
{
  "address1":"奈良県",
  "address2":"",
  "character":"せんとくん"
}
...

これを3ノードからなる Hadoop分散ファイルシステムに配置し、各ノードでは Drillbit を稼働しておきます。クエリは、ゆるキャラごとにその住所に住んでいる人数をカウントするというシンプルな処理です。

SELECT
  b.`character` AS `ゆるキャラ`,
  b.address1 as `都道府県`,
  b.address2 as `市区町村`,
  count(*) as `人数`
FROM
  dfs.`/personal_information_50m.csv` AS a
JOIN
  dfs.`/yuru.json` AS b
ON
  (b.address1 = a.columns[7]) AND
  (b.address2 = a.columns[8] OR b.address2 = '') AND
  b.`character` <> ''
GROUP BY b.`character`, b.address1, b.address2;

まず Drill の Web UI の「Query」タブでこの SQL を Submit します。

f:id:nagixx:20151222192355p:plain

クエリの実行が始まり、しばらくすると結果が表示されます。

f:id:nagixx:20151222192744p:plain

ここで「Profiles」タブをクリックすると、完了したクエリの一覧が表示されます。ただし、表示されるクエリはアクセスしている Web UI のノードが Foreman(クエリを受け付けたノード)となったクエリだけですので、それが必ずしもクエリを投入したノードになるとは限らないことにご注意を。対象のクエリが表示されなかったら、ほかの Drillbit ノードの UI も見てみましょう。

f:id:nagixx:20151222193619p:plain

ここで対象のクエリをクリックすると、そのクエリのプロファイル情報が表示されます。ここには大きく分けて次のような情報があります。

  • クエリとそのプラン(Query and Planning)
  • クエリのプロファイル(Query Profile)
  • Fragment のプロファイル(Fragment Profiles)
  • オペレータのプロファイル(Operator Profiles)
  • 完全な JSON 形式のプロファイル(Full JSON Profile)

f:id:nagixx:20151222193855p:plain

これらを見ていく前に、プランをビジュアルに表示する「Visualized Plan」タブの内容が便利なので、まず見てみます。ここではプランの非循環有向グラフ(DAG)が色分けされて表示されます。実際の処理は下から上に向けて流れていきます。

グラフの各ノードはスキャン、フィルタ、集約などの処理を表すオペレータで、Major Fragment ごとに異なる色が付けられています。よくみると、Major Fragment はクラスタノード間でデータの交換が行われる Exchange オペレータを境に分割されていることがわかります。

また、各ノードについている番号は、前回の記事でも出てきた <Major Fragment ID>-<Operator ID> を示しています。

f:id:nagixx:20151222195038p:plain

さて、Fragment Profiles の欄に目を移すと、各 Major Fragment の処理がどのように並列化され、各処理にどれだけ時間がかかったかがわかるタイムラインの表示があります。それぞれの色は Visualized Plan の Major Fragment の色に対応しており、並列度が大きいほど上下方向の幅が太く表示されることになります。

下の例では、水色の Major Fragment 04 の処理は一瞬で完了し、緑色の Major Fragment 03 の処理はある程度の並列度でしばらく進んだ後、若干のばらつきがありつつ完了していることがわかります。もしこの Major Fragment の表示が極端な段差を示している場合には、データの偏りやデータローカリティの問題が疑われます。

f:id:nagixx:20151223030602p:plain

さらにもう少し詳細に Major Fragment の内容を見ていくには、Fragment Profiles の「Major Fragment: <Major Fragment ID>-xx-xx」と書かれた帯をクリックすると、詳細が展開表示されます。

各行の先頭についている番号は <Major Fragment ID>-<Minor Fragment>-xx を表しています。Minor Fragment というのは、Major Fragment を並列化した一つ一つの処理のことです。各 Minor Fragment の実行ノードや処理時間、処理レコード数、ピークメモリ量などを見ることができます。

下の例では、Major Fragment 03 が 9 並列に分割され、3ノードに均等に割り当てられていることがわかります。

f:id:nagixx:20151223031816p:plain

ところで上記の Minor Fragment の情報では並列に実行される様子がわかりましたが、Minor Fragment 内に含まれている個々のオペレータがどれくらいのリソースを使って実行されたかまではわかりません。これを確認するためには Operator Profiles に注目します。

各行の先頭についている番号は <Major Fragment ID>-xx-<Operator ID> です。そして各オペレータのセットアップ時間、処理時間、待ち時間、ピークメモリ量などを見ることができます。ここではオペレータに焦点を置いているため Minor Fragment の区別はしていませんが、最小値、平均値、最大値は Minor Fragment 間で集約した結果の値です。

f:id:nagixx:20151223033352p:plain

もしさらに詳細な Minor Fragment ごとのオペレータのプロファイルが欲しい場合は、Operator Profiles の「<Major Fragment ID>-xx-<Operator ID> - <Operator Name>」が書かれた帯をクリックします。すると、<Major Fragment ID>-<Minor Fragment>-<Operator ID> のレベルまで細分化されたプロファイルを得ることができます。

f:id:nagixx:20151223034516p:plain

クエリ処理のどの部分でどれだけ時間やリソースを消費しているかを把握することは、パフォーマンスチューニングの基本中の基本ですので、まずはこの UI をマスターしましょう。