About Me

My photo
Software Engineer at Starburst. Maintainer at Trino. Previously at LINE, Teradata, HPE.

2018-10-05

Aster debug the SQLMR

Aster AnalyticsのSQL-MR関数をデバッグする方法です。ログだけで原因が特定できず、更にはサポートチームで再現できない時には非常に役に立つTipsかと思います。
流れとしては対象の関数(jar, zip)をサーバーからダウンロードし、それを読み込んだSQL-MR UDFを作成後にインストール、最後に実行です。

今回Random Forestの実行中に元データが約50MBしか無いにも関わらずJVMのMaxHeapSizeを120GBまで設定してもOOMになるという事象が発生したので、その時のデバッグの流れを記載してみます。
Asterのインタフェースから実行したのはforest_drive関数ですが、AMCでログを確認したところエラーとなっていたのは内部で実行されているforest_builderだったので、その関数を調査のついでに修正してみました。

1. 関数のダウンロード
どこに格納されているかは環境に依存してしまいますが、関数名.zipもしくは関数名.jarでファイルを探せば見つかるはずです。VM環境では以下のパスに存在していました。
/data/ncluster/tmp/cleanonreboot/WorkerDaemon/tasks/218.0/tmp/filedump/public

2. ライブラリをインポートしたプロジェクトを作成
Javaのプロジェクトを作成します。間違えて元の関数を上書きしないよう別名にしておきましょう。今回はforest_builder_debugにしました。
ライブラリの追加はIntelliJであれば、各jarファイルを右クリック→Add as a libraryしていきます。
次に追加した関数のpackageと同じ階層でデバッグ用のコードも合わせ、jarファイルからコピー&ペーストします。

packageはforest_builderであれば以下のようになっていました。
package com.asterdata.sqlmr.analytics.predictive_modeling.decision_tree.forest

3. エラーメッセージから原因の場所を探すとscanRowsメソッド内の以下の部分にたどり着きます。forループの後ろにobjectOutputStream.reset();を追加したところ、大幅にメモリ使用量が抑えられ、50MBのデータも問題なく実行できるようになりました。

try {
    int e = poissonGenr.nextInt();
    ObjectOutputStream objectOutputStream = (ObjectOutputStream)ex.get(var13);

    for(int j = 0; j < e; ++j) {
        objectOutputStream.writeObject(var12);
    }
    objectOutputStream.reset(); // Added
} catch (Error var10) {
    throw new IllegalUsageException("Please increase JVM size to avoid Java heap out of space");
}

4. インストール
actでログインし、インストールコマンドを実行します。ファイル名しか指定できないのでファイルがあるディレクトリまで移動しておく必要があります。
\install forest_builder_debug.jar
(アンインストールの場合は\remove forest_builder_debug.jarです)

5. 実行
実行時はエラーとなった際に使用しているクエリの関数名部分を入れ替えるだけです。

2018-10-04

Bulk Insert to Teradata using Python ​


This snipet is bulk-loading csv to Teradata via Python. Recently teradatasql was released, but this code uses PyTd. If you haven't setup PyTd, please install the library by `pip install teradata`.

import teradata
import csv

udaExec = teradata.UdaExec()
session = udaExec.connect("tdpid")
data = list(csv.reader(open("testExecuteManyBach.csv")))
batchsize = 10000
for num in range(0, len(data), batchsize):
    session.executemany("insert into testExecuteManyBatch values (?, ?, ?, ?)"),
    data[num:num+batchsize], batch=True)


The points are batch=True and specifying batchsize. If you don't set the batchsize or the size is too large, it will be failed (forgot the actual message though). The performance in my environment (1 node) was 10,000 rows/sec. The table has 4 columns. I assume tens of thousands looks fine, but more rows should be imported with FastLoad or MLOAD.