ビットの海

ゆるふわソフトウェアエンジニアしゃぜのブログ

AWS JavaSDKを使ってEMRでjsonログをごにょっとするメモ

はじめに

  • Webの資料も少ないEMRと、AWS JavaSDKを利用して、jsonログをHiveから処理するための流れについて簡単にまとめました。
  • jdbc接続そのものの記述も省略しています。(DataSouceの定義など)

想定する構成

構成図

f:id:shase428:20160404192714p:plain

各種ソフトウェアのバージョン

  • JDK 1.8
  • EMR AMI 4.x
  • AWS JavaSDK 1.10.40
  • Spring Boot 1.x

一般的な注意点

  • Webの情報の新しさに注意
    • EMR AMIは3系と4系でかなり違っています。情報を参照する場合は、EMR AMIのバージョンに注意してください。

EMRクラスタの作成と削除

サンプルコード

EMRクラスタは、EMR APIを利用して作成することができます。クラスタ起動から、シャットダウンまでのサンプルコードを下記に記載します。

Client.Java

public class Client {

    public static void main(String args[]) {
        EmrService emrService = new EmrService();

        // 1.EMRクライアントの初期化
        emrService.init();
        // 2.EMRクラスタの起動
        String jobFlowId = emrService.lunchCluster();
        // 3.EMRクラスタの状態確認(起動中、待機中、など)
        emrService.getClusterStatus(jobFlowId);
        // 4.EMRクラスタのシャットダウン
        emrService.terminateCluster(jobFlowId);
    }

}

EmrService.java


import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.Application;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.PlacementType;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest;

public class EmrService {

    private AmazonElasticMapReduceClient emrClient;

    /**
    * AmazonElasticMapReduceClientを生成する
    */
    public void init() {
        AWSCredentials cred = new BasicAWSCredentials("accessKey", "secretKey");
        AWSCredentialsProvider credentialsProvider = new StaticCredentialsProvider(cred);
        emrClient = new AmazonElasticMapReduceClient(credentialsProvider);
        // リージョンを指定
        emrClient.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1));
    }

    /**
    * クラスタの起動
    *
    * @return
    */
    public String lunchCluster() {


        Application hive = new Application();
        hive.withName("Hive");// hiveのinstall

        RunJobFlowRequest request = new RunJobFlowRequest()
                .withName("clusterName") // クラスタ名
                .withApplications(hive)
                .withLogUri("s3buket") // S3のバケットのpath
                .withReleaseLabel("emr-4.2.0") // EMR AMIバージョン
                .withServiceRole("EMR_DefaultRole")
                .withJobFlowRole("EMR_EC2_DefaultRole")
                .withVisibleToAllUsers(true)
                .withSteps(this.buildStepConfig().toArray(new StepConfig[0]))
                .withInstances(new JobFlowInstancesConfig()
                        .withInstanceCount(2) // インスタンス数
                        .withKeepJobFlowAliveWhenNoSteps(true)
                        .withPlacement(new PlacementType().withAvailabilityZone("ap-northeast-1a"))
                        .withEc2KeyName("ec2-key-name")
                        .withMasterInstanceType("master-instance-type")
                        .withSlaveInstanceType("slave-instance-type"));

        RunJobFlowResult result = emrClient.runJobFlow(request);
        return result.getJobFlowId(); // jobFlowIdは停止などに必要
    }

    private List<StepConfig> buildStepConfig() {
        List<StepConfig> result = new ArrayList<>();

        String[] args = {
                "s3-dist-cp"
                , "--src", "s3のpath"
                , "--dest", "hdfsのpath"
                , "--groupBy", "正規表現"
                , "--targetSize", "MB指定"
        };

        StepConfig s3DistCpStep = new StepConfig()
                .withName("S3distCp")
                .withActionOnFailure(ActionOnFailure.CONTINUE)
                .withHadoopJarStep(
                        new HadoopJarStepConfig()
                                .withJar("command-runner.jar")
                                .withArgs(args)
                );

        result.add(s3DistCpStep); // サンプルなので、1つしか定義してないが、複数定義可能
        return result;
    }

    /**
    * クラスタの状態を確認する
    *
    * @param jobFlowId
    */
    public void getClusterStatus(String jobFlowId) {
        DescribeClusterResult describeClusterResult = emrClient.describeCluster(new DescribeClusterRequest().withClusterId(jobFlowId));
        System.out.println(describeClusterResult.getCluster().getStatus().toString());
        System.out.println(describeClusterResult.getCluster().getMasterPublicDnsName());
    }

    /**
    * クラスタを停止させる
    *
    * @param jobFlowId
    */
    public void terminateCluster(String jobFlowId) {
        emrClient.terminateJobFlows(new TerminateJobFlowsRequest().withJobFlowIds(jobFlowId));
    }

}

S3から、HDFSへのログのコピー

S3上のログファイルに対して、直接hiveテーブルを作成し、操作することも可能ですが、速度が犠牲になってしまいます。他の要素とのトレードオフですが、今回は、s3-dist-cpを利用して、s3上のログファイルを一定サイズまでマージして、HDFSにコピーすることとします。

実際の処理は、EMRのjobStepで行います。下記のドキュメントを参照してください。 http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/UsingEMR_s3distcp.html

SDKを利用した実装のサンプルは下記になります。(前記、EmrService.javaの抜粋です)


    /**
    * クラスタの起動
    *
    * @return
    */
    public String lunchCluster() {

        Application hive = new Application();
        hive.withName("Hive");

        RunJobFlowRequest request = new RunJobFlowRequest()
                .withName("clusterName") // クラスタ名
                .withApplications(hive)
                .withLogUri("s3buket") // S3のバケットのpath
                .withReleaseLabel("emr-4.2.0") // EMR AMIバージョン
                .withServiceRole("EMR_DefaultRole")
                .withJobFlowRole("EMR_EC2_DefaultRole")
                .withVisibleToAllUsers(true)
                .withSteps(this.buildStepConfig().toArray(new StepConfig[0]))
                .withInstances(new JobFlowInstancesConfig()
                        .withInstanceCount(2) // インスタンス数
                        .withKeepJobFlowAliveWhenNoSteps(true)
                        .withPlacement(new PlacementType().withAvailabilityZone("ap-northeast-1a"))
                        .withEc2KeyName("ec2-key-name")
                        .withMasterInstanceType("master-instance-type")
                        .withSlaveInstanceType("slave-instance-type"));

        RunJobFlowResult result = emrClient.runJobFlow(request);
        return result.getJobFlowId(); // jobFlowIdは停止などに必要
    }

    private List<StepConfig> buildStepConfig() {
        List<StepConfig> result = new ArrayList<>();

        String[] args = {
                "s3-dist-cp"
                , "--src", "s3のpath"
                , "--dest", "hdfsのpath"
                , "--groupBy", "正規表現"
                , "--targetSize", "MB指定"
        };

        StepConfig s3DistCpStep = new StepConfig()
                .withName("S3distCp")
                .withActionOnFailure(ActionOnFailure.CONTINUE)
                .withHadoopJarStep(
                        new HadoopJarStepConfig()
                                .withJar("command-runner.jar")
                                .withArgs(args)
                );

        result.add(s3DistCpStep); // サンプルなので、1つしか定義してないが、複数定義可能
        return result;
    }

JDBCからHiveクエリの実行

EMRで、クラスタが起動後、Hadoop上の、Hiveの操作に移ります。

JDBCアクセスするための事前準備

EMR上のhiveに対し、JDBCアクセスをするためには、専用のドライバをアプリケーションに組み込む必要があります。詳細については、下記のドキュメントを参考にしてください。 http://docs.aws.amazon.com/ElasticMapReduce/latest/ReleaseGuide/HiveJDBCDriver.html

上記ドキュメントに添付されている、

Hive 1.0 JDBC drivers (driver version 1.0.4): https://amazon-odbc-jdbc-drivers.s3.amazonaws.com/public/AmazonHiveJDBC_1.0.4.1004.zip

こちらのzipファイルを解凍すると、readmeと、jarファイルがいくつか添付されてきます。 こちらのjarを直接Javaアプリケーションに組み込んで仕様してください。

jsonログのパーサーを用意

EMRで、jsonログを読み込む際に、Webの多くの資料では、 s3://elasticmapreduce/samples/hive-ads/libs/jsonserde.jar を使う手順が紹介されていると思います。

しかし、こちらは、複雑なjsonに対応できていない、古いバグが放置されている、などの問題がありました。

そこで、今回は、serdeのライブラリとして下記をビルドして使います。

https://github.com/rcongiu/Hive-JSON-Serde

こちらから以下の手順でjarをつくり、s3上の適当な場所に配置して、hiveから利用してください。

git clone https://github.com/rcongiu/Hive-JSON-Serde.git
cd Hive-JSON-Serde
mvn -Dmaven.test.skip=true package

アプリケーションから、EMRへのconnectivity

JDBC接続するためには、IP接続可能な状態でなければいけません。hiveクエリ発行前に、sshトンネルを利用して、hadoopのmasterにセッションを作成します。下記にサンプルコードを記述します。 sshトンネルのために、jschを利用しています。

import java.util.Date;

import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;


  // クラスタ起動時渡されるMasterPublicDnsNameを仕様します
    public void execute(String masterPublicDnsName) {
            Session session = null;

            String sshUser = "hadoop";
            int sshPort = 22;

            String strRemoteHost = "127.0.0.1";
            int localPort = 10005;
            int remotePort = 10000;

            final JSch jsch = new JSch();
            jsch.addIdentity("AWSで利用できるEC2用pemファイルのpath");
            session = jsch.getSession(sshUser, masterPublicDnsName, sshPort);

            final Properties config = new Properties();
            config.put("StrictHostKeyChecking", "no");
            session.setConfig(config);

            try {
                // ssh接続
                session.connect();
                // portフォワード
                session.setPortForwardingL(localPort, strRemoteHost, remotePort);
                //todo: hiveクエリの発行など
            } catch (Exception e) {
                //todo:
            } finally {
        // sessionをとじる
                session.disconnect();
            }
  }

Hiveテーブルの作成

JDBC経由で以下のようなコードでhiveテーブルを作成します。 HDFS上のログは、HDFS上に、yyyyMMddのディレクトリで、日付ごとに格納されていると想定しています。 よって、日付単位でのパーティションを作成しています。

private JdbcTemplate jdbcTemplate; // springのjdbcTemplate事前に生成しておく

public void createTargetTable(Date targetDate) {
  jdbcTemplate.execute("add jar " + "s3のパス" + "json-serde-1.3.7-SNAPSHOT-jar-with-dependencies.jar");

  jdbcTemplate.execute("drop table if exists sample_table");

  StringBuilder createTable = new StringBuilder();
  createTable.append("CREATE EXTERNAL TABLE IF NOT EXISTS sample_table ( ");
  createTable.append(" Value string,");
  createTable.append(" Id bigint,");
  createTable.append(" )");
  createTable.append(" PARTITIONED BY ( PT STRING )");
  createTable.append(" row format serde 'org.openx.data.jsonserde.JsonSerDe'");
  createTable.append(" with serdeproperties ('paths'='Id,Value')");

  jdbcTemplate.execute(createTable.toString());

  ZonedDateTime targetDay = targetDate.toInstant().atZone(ZoneId.systemDefault());
  ZonedDateTime nextDay = DateUtils.addDays(targetDate, 1).toInstant().atZone(ZoneId.systemDefault());

  StringBuilder partitionTargetday = new StringBuilder();
  partitionTargetday.append("ALTER TABLE sample_table ADD PARTITION ( pt='");
  partitionTargetday.append(targetDay.format(DateTimeFormatter.ISO_LOCAL_DATE));
  partitionTargetday.append("' ) LOCATION '");
  partitionTargetday.append("HDFS上のパス");
  partitionTargetday.append(targetDay.format(DateTimeFormatter.ofPattern("yyyyMMdd")));
  partitionTargetday.append("/'");

  jdbcTemplate.execute(partitionTargetday.toString());

}

JDBC経由でのクエリ発行

テーブル作成後は、通常のJDBC接続で、RDBに接続するように、Hiveクエリの発行が可能です。 実行例を下記に記します。

// 含まれているデータベース一覧を取得
jdbcTemplate.execute("show dabases");

参考資料

プログラミング Hive

プログラミング Hive

いまさらiphone6sに機種変しました

f:id:shase428:20160402234159j:image

auからクーポン来たので、そろそろ5cも辛くなってきたし、SEにするかぁ、と思って行ったら結果6sにしてた。。。

なんか大きい画面を一度見てしまうと、どうにもSEにする気になれず。

しっかしキャリアでふつーに契約すると月額料金相変わらず、すごいことになるよねぇ。MVNOちょっと考えよう(´・_・`)

infrastructure as codeとredfish apiのメモ

概要

(redshiftだと思った人はごめん。)

先日この記事を読みました。

私は Infrastructure as Code をわかっていなかった - メソッド屋のブログ

infrastructure as codeは、VM環境の場合は、VMのコントロールをすること、になるだろうし(例えば、AWSSDKなどから)、物理環境の場合だと、リモートから物理サーバ起動して、bios(UEFI)の設定して、みたいなことになるんだろうと思っています。従来担っていたのは、IPMIなどの仕組みですね。

ふと、2014年ごろに、話題になった、アレ(ポストIPMIが期待される規格)はその後どうなったのかと思い出しました。

News & Trend - データセンターの運用を簡単に、インテルなど大手6社が標準を策定へ:ITpro

結論から言ってしまうと、ポストIPMIは、2015年に、redfish apiという規格にまとまっていたのでした。

REDFISH | DMTF

Dell/HP/Intel/Emerson が構築する Redfish は、IPMI 以来のスタンダードになり得るのか? | Agile Cat --- in the cloud

ざっとネット見た感じ、すでにHP iLO4とかで使えるらしいですね。

概要はこれを見れば良いと思います。以下はただの余談です。

Redfish: サーバ管理のための新しいAPI

余談

IPMIを使ったことのある人はわかると思うのですが、アレでなにかしようととすると、当たり前ですが、低レイヤな知識をかなり要求されるので、現代のゆるふわゆとりエンジニア(俺)には辛いものがあります。たぶん、みんな辛いと思っていたので、Webの世界で一般的なRestfulのAPIで、ハードウェアの操作ができたらいいね、っていうことらしいです。

前述の記事にもあるけど、大手のvendorが組んでできたのが、refish apiの規格。

ここにjsonのmockがあります。

Simple Rack-mounted Server | Redfish Resource Explorer

出力見るとIPMIっぽいですね。

詳細はSPECどうぞ

REDFISH | DMTF

といってもボリュームあって、辛いので、導入としてははこの資料がわかりやすいです。

https://www.dmtf.org/sites/default/files/Introduction_to_Redfish_2015.pdf

幾つか拾うと

  • 今時SOAPじゃなくて、RESTだろう常考
  • 人が見やすい形式でoutputするよ
  • OCP Remote Machine Managementに準拠してるよ
  • IPMIをRetrieveしてるよ

って感じで、ああ、今風を目指したんだなーと。あと、欧米はpythonjava好きだなーという感想をうけましたw

おわりに

各vendorのツールが、redfishに準拠すれば、データセンターの辛いことも、少しは辛くなくなるのかもしれないですね。

ただ、今や(自分も含め)クラウドの普及によって、実際のハードウェアと向き合う人の数は相当減っているので、IPMIと同じくらいか、それ以下のニッチなナレッジにとどまるかもしれないなぁ、とは思います。

実機を触れる幸せな方は試してみていかがでしょうか。

IPMIさんおつかれさまでした。

Linuxデバイスドライバプログラミング

Linuxデバイスドライバプログラミング

部屋の温度をMackerelで眺める

前回の「Raspberry Pi 2 Model Bで部屋の温度計を作ってみよう」で温度計を作ってみたので、外出先でも、その値を参照できるように今回は監視システムに飛ばしてみます。

最初、zabbixを自分で立てることも考えたんですが、スマホで見えると便利だよなーと思い直し、今回はMackerelのフリー版を使ってみます。

ラズパイにMackerel Agentを導入

こちらを参考にしました。とても簡単ですねー。 qiita.com

# tar.gzをdownload
wget https://github.com/mackerelio/mackerel-agent/releases/download/v0.28.0/mackerel-agent_linux_arm.tar.gz

# 解凍
tar xvfz mackerel-agent_linux_arm.tar.gz

# optの下に移動
sudo mv mackerel-agent_linux_arm /opt/mackerel-agent

さて、ここまでやったら、下記のURLから、MackerelのAPIキーを確認します。

https://mackerel.io/orgs/{organization名}#apikey

設定ファイルをつくります。

sudo sh << SCRIPT
cat >>/opt/mackerel-agent/mackerel-agent.conf <<'EOF';
apikey = "{APIキー}"
EOF
SCRIPT

カスタムメトリクス用の設定

前回「Raspberry Pi 2 Model Bで部屋の温度計を作ってみよう」で、動くようにしたコマンドを使って、簡単な出力をするシェルをつくります。

vi /home/pi/temper-agent.sh
#!/bin/sh

ID="temperature"
TEMP=`/home/pi/temper/temper |awk -F ',' '{print $2}'`
DATE=`env TZ=JST date +%s`

echo "${ID}\t${TEMP}\t${DATE}"
chmod +x /home/pi/temper-agent.sh

agentの設定ファイルに下記を追記

vi /opt/mackerel-agent/mackerel-agent.conf
[plugin.metrics.temperature]
command = "/home/pi/temper-agent.sh"

Mackerel Agent起動

sudo nohup /opt/mackerel-agent/mackerel-agent --conf=/opt/mackerel-agent/mackerel-agent.conf &

Mackerel みてみよう

さて、ここまでやると、グラフが出るようになると思います(電球の下においてあるので温度がおかしいですがw) 便利な世の中ですねぇ。。。

f:id:shase428:20160207015515p:plain:w400

スマホでも監視ができるようになりました!

ダッシュボードに設定すると見やすいですね^^

Raspberry Pi 2 Model Bで部屋の温度計を作ってみよう

いやーインターネットってのは便利ですね。 外部ディスプレイとかなくても、なんとかなります。

買ったもの

MicroSDは8GB~32GBぐらいまでで、お好きなものを。

USB温度計! USB thermometer-528018

USB温度計! USB thermometer-528018

初期設定

ここからraspbian(RASPBIAN JESSIE LITE)を落とす

Download Raspbian for Raspberry Pi

Mac上で下記の作業をする

unzip 2015-11-21-raspbian-jessie-lite.zip

# SDカードをunmount
sudo diskutil unmount /dev/disk3s1

# rawデバイスに書き込む。/dev/disk3s1ならば、/dev/rdisk3となる。
sudo dd bs=1m if=2015-11-21-raspbian-jessie-lite.img of=/dev/rdisk3

microSDをラズパイに挿して、イーサネットケーブルを指し、DHCP環境で起動。 (MacBookProの場合、イーサネットケーブルで直結して、インターネット共有を使うと楽)

arp -aで、割り振られたIPアドレスをてきとうに推測して、sshで接続(初期ID:pi、初期パスワード:raspberry) 固定IPを設定するので、/etc/dhcpcd.confに下記のような感じで追記

interface eth0
static ip_address=192.168.xxx.xxx/24
static routers=192.168.xxx.xxx
static domain_name_servers=192.168.xxx.xxx

で、reboot

TEMPer使う為の設定

以下はラズパイ上で。

# git導入
sudo apt-get update
sudo apt-get install git

# 感謝
sudo git clone git://github.com/bitplane/temper.git
cd temper

# ビルド
sudo apt-get install gcc libusb-dev
sudo make

# 権限付与
sudo chmod u+s temper

ここまでやると

pi@raspberrypi:~/temper $ ./temper
23-Jan-2016 16:50,31.296873

これで温度がでる。すごいなぁ。。。

センサーの感度があんまりよくないとかはあるけど、まぁまぁ、値段相応だよね。

グラフ化を考えつつ、今日はここまで。

2016年もはじまりました

あけましておめでとうございます。

特に抱負めいたものを書くつもりはなかったのですが、気がついたら、今の会社に入社したのが2011年の1月で、ちょうど丸5年。

今年で6年目でした。

新卒で入った会社を3年9ヶ月で辞めているので、随分と長い時間を今の職場で過ごしたのだなーと感じています。

節目なので、何年か先の自分に向けて伝言でも書いておこうかなと。

5年前の2011年を振り返ってみると、26歳の自分は本当にしょぼかったなぁという感想しかありません。

ま、そう思えるということは、多少は進歩してるということなので、5年後に2016年のしゃぜはミジンコだったなーと思えるようにしたいと思っています。

30歳を過ぎてからは30代の自分のテーマをいろいろ模索していて、まだもやっとしているんですが。。。

20代の時に考えていたのは、こんな三十路になりたいなーというのがあって、そのギャップを埋めるべく行動していたので、自分が考えるこんな四十路になりたい、っていうのをあれこれ考えたりしています。

最近、自分で重視しているのは

  • プログラミング
    • 特に言語など問わず
  • 数学
  • 英語

なんですが、今年からはこれに加えて

  • 健康
    • 大きい病気とかはしてないけど、一昨年骨折とかあったしね。
  • 家庭
    • 結婚したので。

が大事かなーと思っています。

今はどれも中途半端なので、それぞれのクオリティをあげていきたいですね!

仕事に関していうと、あれやこれやが積み上がっているので、とにかく目の前の仕事に集中するのを大切にしたいと思います。

というわけで本年もよろしくおねがいします。

30代で人生を逆転させる1日30分勉強法

30代で人生を逆転させる1日30分勉強法