Chapter 8 Spark SQLによるデータ処理
必要となるデータはここを参考に配置
8-8 データ分析
(Chapter 6はうまく動作しなかったのでパス)
データの集計軸の設定方法
データの集計実行
データ出力基準の変更
ウインドウ方式の変更
特定のデータのみ出力、条件にあったデータが少ないためかデータ量があまり出てこなかった。
複数の出力先に出力
画面への出力
kafkaへの出力
kafkaへのデータ送信
UDF化
kafkaからのセンサデータ取得方法(05-04.py)
このままではjsonの文字列を表示しているだけなのでカラムとして認識させるためパースさせた結果(05-05.py)
データの変換方法
表示
センサーデータを用意して`/var/log/sensor_data/`にsensor_data.logとして保存しておく
zookeeper,kafkaを起動しておく。
td-agentの`/etc/td-agent/td-agent.conf`をページ100の05-01.confに沿って変更する。
td-agentをrestartさせる。/var/log/td-agent/td-agent.logを確認して動作しているか確認する
kafkaからデータを取得する。起動させてから約1分待つ
/var/log/td-agent/pos/sensor_data.posになにが書き込まれているか確認する
データが生成されてから直後の数秒以内にレスポンスよく処理をストリーム処理と呼ぶ
バッチ処理とストリーム処理の結合=マイクロバッチ処理
1つ目の端末でncコマンドで待機
nc -lk 8888 #本にはport番号を9999に設定してあるが、Fluentdのhttpのportと衝突するので8888に変更
もう一つの端末で
python /opt/spark-book/05-01.py 192.168.33.10 8888
を実行後 ncを立ち上げた端末で cat cat dogと入力
上と同じように nc -lk 8888 python /opt/spark-book/05-02.py 192.168.33.10 を実行後ncを立ち上げてある端末でcat cat cat dogと入力
本に従ってFluentdの設定ファイル`/etc/td-agent/td-agent.conf‘を書き換え
同じく本に従ってkafka側の設定
td-agent.confを変更したのでFluentdを再起動`systemctl restart td-agent`
kafkaの`cpnsole-consumer`を起動(既にデータを受け付けている)
curlでデータ(sensor-data)を送信(port=9999)
データはほぼ60secごとに送られてくる。変更が可能なのか調べた。
flush_intervalというパラメータを設定すればよい(デフォルトは60sec)
これを10にするとほぼ10secで送ってくる
Pass