About Me

My photo
Software Engineer at Starburst. Maintainer at Trino. Previously at LINE, Teradata, HPE.

2019-01-04

Try Apache Griffin

Currently, Griffin doesn't provide the binary, therefore you need to build it yourself. It's not difficult though. I tried it on hortonworks:sandbox-hdp container. Even if you're using default environment, additional steps are just adding Spark on Ambari and installing maven.

# Add Spark
login ambari (http://localhost:8080/#/login)
Spark2 -> Service Actions -> Start

# Login to hortonworks:sandbox-hdp using bash
docker exec -it sandbox-hdp bash

# Install maven if not installed on there
wget http://ftp.riken.jp/net/apache/maven/maven-3/3.6.0/binaries/apache-maven-3.6.0-bin.tar.gz
tar -xvf apache-maven-3.6.0-bin.tar.gz
cd apache-maven-3.6.0
mv apache-maven-3.6.0 /opt/apache-maven-3.6.0
vi /etc/environment
  M2_HOME="/opt/apache-maven-3.6.0"
  export PATH=$M3:$PATH

sudo update-alternatives --install "/usr/bin/mvn" "mvn" "/opt/apache-maven-3.6.0/bin/mvn" 0
sudo update-alternatives --set mvn /opt/apache-maven-3.6.0/bin/mvn

# Install griffin (http://griffin.apache.org/docs/quickstart.html)
wget https://www.apache.org/dist/incubator/griffin/0.3.0-incubating/griffin-0.3.0-incubating-source-release.zip
unzip griffin-0.3.0-incubating-source-release.zip
cd griffin-0.3.0-incubating
mvn clean install
  It takes about few minutes (my case was 14 min) and the size will be 660MB.
mv measure/target/measure-0.3.0-incubating.jar /usr/local/griffin-measure.jar

# Prepare demo data
wget --recursive --no-parent http://griffin.apache.org/data/batch
cd griffin.apache.org/data/batch
chmod +x *.sh
./gen_demo_data.sh

hive
CREATE EXTERNAL TABLE `demo_src`(
  `id` bigint,
  `age` int,
  `desc` string)
PARTITIONED BY (
  `dt` string,
  `hour` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
;

CREATE EXTERNAL TABLE `demo_tgt`(
  `id` bigint,
  `age` int,
  `desc` string)
PARTITIONED BY (
  `dt` string,
  `hour` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
;


LOAD DATA LOCAL INPATH 'demo_src' INTO TABLE demo_src PARTITION (dt='20180912',hour='09');
LOAD DATA LOCAL INPATH 'demo_tgt' INTO TABLE demo_tgt PARTITION (dt='20180912',hour='09');

# Create config file
vi env.json
{
  "spark": {
    "log.level": "WARN"
  },
  "sinks": [
    {
      "type": "console"
    },
    {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/persist"
      }
    },
    {
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "http://es:9200/griffin/accuracy"
      }
    }
  ]
}


vi dq.json
{
  "name": "batch_accu",
  "process.type": "batch",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connectors": [
        {
          "type": "hive",
          "version": "1.2",
          "config": {
            "database": "default",
            "table.name": "demo_src"
          }
        }
      ]
    }, {
      "name": "tgt",
      "connectors": [
        {
          "type": "hive",
          "version": "1.2",
          "config": {
            "database": "default",
            "table.name": "demo_tgt"
          }
        }
      ]
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "src.id = tgt.id AND src.age = tgt.age AND src.desc = tgt.desc",
        "details": {
          "source": "src",
          "target": "tgt",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },
        "out": [
          {
            "type": "metric",
            "name": "accu"
          },
          {
            "type": "record",
            "name": "missRecords"
          }
        ]
      }
    ]
  },
  "sinks": ["CONSOLE", "HDFS"]
}

# Run !
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default --driver-memory 1g --executor-memory 1g --num-executors 2 /usr/local/griffin-measure.jar /env.json /dq.json

# Result
data source timeRanges: src -> (1544668358323, 1544668358323], tgt -> (1544668358323, 1544668358323]
[1544668358323] batch_accu start: application_1544490410222_0011
batch_accu [1544668358323] metrics:
{"name":"batch_accu","tmst":1544668358323,"value":{"total_count":125000,"miss_count":512,"matched_count":124488}}

# Check above miss_count manually
select
 count(src.id) as total_count
 ,sum(if(tgt.id is null, 1, 0)) as miss_count
 ,sum(if(tgt.id is not null, 1, 0)) as matched_count
from demo_src src
left outer join demo_tgt tgt
on src.id = tgt.id AND src.age = tgt.age AND src.desc = tgt.desc
;

total_count, miss_count, matched_count
125000, 512, 124488

# Generated files on HDFS
[root@sandbox-hdp /]# hdfs dfs -ls /griffin/persist/batch_accu/1544668358323
Found 5 items
-rw-r--r--   1 root hdfs          0 2018-12-13 02:33 /griffin/persist/batch_accu/1544668358323/_FINISH
-rw-r--r--   1 root hdfs        137 2018-12-13 02:33 /griffin/persist/batch_accu/1544668358323/_LOG
-rw-r--r--   1 root hdfs        113 2018-12-13 02:33 /griffin/persist/batch_accu/1544668358323/_METRICS
-rw-r--r--   1 root hdfs         30 2018-12-13 02:32 /griffin/persist/batch_accu/1544668358323/_START

-rw-r--r--   1 root hdfs      44543 2018-12-13 02:33 /griffin/persist/batch_accu/1544668358323/missRecords


hdfs dfs -cat /griffin/persist/batch_accu/1544668358323/_FINISH
(empty)

hdfs dfs -cat /griffin/persist/batch_accu/1544668358323/_LOG
================ log of Thu Dec 16 02:32:38 UTC 2018 ================
--- Thu Dec 16 02:33:18 UTC 2018 ---
process using time: 40444 ms


hdfs dfs -cat /griffin/persist/batch_accu/1544668358323/_METRICS
{"name":"batch_accu","tmst":1544668358323,"value":{"total_count":125000,"miss_count":512,"matched_count":124488}}


hdfs dfs -cat /griffin/persist/batch_accu/1544668358323/_START
application_1544490410222_0011



hdfs dfs -cat /griffin/persist/batch_accu/1544668358323/missRecords
{"id":124,"age":1273,"desc":"1273","dt":"20180912","hour":"09","__tmst":1544668358323}
{"id":124,"age":1065,"desc":"1065","dt":"20180912","hour":"09","__tmst":1544668358323}
{"id":124,"age":1798,"desc":"1798","dt":"20180912","hour":"09","__tmst":1544668358323}