アプリケーションエンジニアのためのApache Spark入門を読む

Chapter 4 Fluentd,Kafkaによるデータ収集

4-4 データ収集フローの構築

本に従ってFluentdの設定ファイル`/etc/td-agent/td-agent.conf‘を書き換え
同じく本に従ってkafka側の設定

f:id:bitop:20181021120920p:plain

td-agent.confを変更したのでFluentdを再起動`systemctl restart td-agent`
kafkaの`cpnsole-consumer`を起動(既にデータを受け付けている)

f:id:bitop:20181021122653p:plain

curlでデータ(sensor-data)を送信(port=9999)

f:id:bitop:20181021122809p:plain

データはほぼ60secごとに送られてくる。変更が可能なのか調べた。
flush_intervalというパラメータを設定すればよい(デフォルトは60sec)
これを10にするとほぼ10secで送ってくる

f:id:bitop:20181021123003p:plain

4-5 実運用に向けて

Pass

アプリケーションエンジニアのためのApache Spark入門を読む

Chapter 4 Fluentd,Kafkaによるデータ収集

Chapter 4 Fluentd,Kafkaによるデータ収集

4-3 データ収集詳細 - Apache Kafka詳細

kafkaのinstall
本の通りのurlでは、何故か接続できなかったので本家に行ってdownload

kafka.apache.org

f:id:bitop:20181021082035p:plain

Zookeeperの起動
/opt/kafka/bin/zookeeper-server/start.sh -daemon /opt/kafka/config/zookeeper.properties 
Kafkaの起動
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server-properties  
この2つは仮想環境を停止・再起動したら止まってしまう。

producerからのメッセージをコンソールへの出力させる準備(page 62記載)
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sample-topic --from-beginning
(本ではportを2181となっているが 9092を指定)
表示用プロンプトが立ち上がると書いてあったがプロンプトは立ち上がらなかったがリスリング状態にはなった。

受信の準備ができたのでproducer側からtopicを送る
/opt/ kafka/bin/kafka-console-producter.sh --broker-list localhost:9092 --topic sample-topic
送信プロンプトが立ち上がり送信可能状態となった

f:id:bitop:20181021091840p:plain

f:id:bitop:20181021091846p:plain

アプリケーションエンジニアのためのApache Spark入門を読む

Chapter 1 データ分析プラットフォームの概要

Chapter 2 Sparkの概要

2-5 本書で利用する環境

 Windows10上にVirtusBox+Vagrantで構築とあるがUbuntuを使っているので(mint 18.01)こちらに環境を構築する。

Chapter 3 サンプルユースケース概要

Chapter 4 Fluentd,Kafkaによるデータ収集

4-2 データ収集詳細 - Fluentd詳細

 本に従ってinstall、起動もOK、動作確認OK

f:id:bitop:20180908101816p:plain

ポート番号を8888から9999に変更

f:id:bitop:20180908102128p:plain

curlで問い合わせ

f:id:bitop:20180908111609p:plain

標準でログに保存されるのでVimで確認

f:id:bitop:20180908112119p:plain
確認OK
f:id:bitop:20180908112300p:plain

「ビックデータ分析・活用のためのSQLレシピ」を読む

7-3 データの重複を検出する

7-3-1 マスターデータの重複を検出する

import pandas as pd
import numpy as np
import psycopg2

conn = psycopg2.connect("dbname=BigData host=localhost user=testuser")
mst_categories = pd.read_sql("SELECT * FROM mst_categories", conn)
print(mst_categories)

f:id:bitop:20180826071909p:plain

mst_categories.duplicated('id')

f:id:bitop:20180826071943p:plain

print(mst_categories[mst_categories.duplicated('id')])

f:id:bitop:20180826072008p:plain

「ビックデータ分析・活用のためのSQLレシピ」を読む

7-2 異常値を検出する

7-2-1 データの分布を計算する

import pandas as pd
import numpy as np
import psycopg2

conn = psycopg2.connect("dbname=BigData host=localhost user=testuser")
action_log_with_noise = pd.read_sql("SELECT * FROM action_log_with_noise", conn)
print(action_log_with_noise.head(3))

f:id:bitop:20180819064102p:plain

s1 = action_log_with_noise.groupby('session').size()
s2= s1.rank(method='first')
df = pd.DataFrame([s1,s2])
print(df.T)

f:id:bitop:20180819064144p:plain

「ビックデータ分析・活用のためのSQLレシピ」を読む

7章 データ活用の精度を高めるための分析術

7-1 データを組み合わせて、新たな切り口を作る

7-1-1 IPアドレスから国・地域を補完する

import pandas as pd
import numpy as np

mst_city_ip = pd.read_csv('GeoLite2-City-Blocks-IPv4.csv')
print(mst_city_ip.head(3))

f:id:bitop:20180814080541p:plain

mst_locations = pd.read_csv('GeoLite2-City-Locations-ja.csv')
print(mst_locations.head(3))

f:id:bitop:20180814080626p:plain

import psycopg2

conn = psycopg2.connect("dbname=BigData host=localhost user=testuser")
action_log = pd.read_sql("SELECT * FROM action_log_with_ip", conn)
print(action_log.head(3))

f:id:bitop:20180814080717p:plain

import ipaddress

action_log['ip'] = action_log['ip'].map(lambda x:ipaddress.ip_address(x))
mst_city_ip['network'] = mst_city_ip['network'].map(lambda x:ipaddress.ip_network(x))

print(action_log.head(3))
print(mst_city_ip.head(3))

f:id:bitop:20180814080812p:plain

pd.merge(action_log,mst_city_ip,left_on='ip',right_on='network')
#無理でした

「ビックデータ分析・活用のためのSQLレシピ」を読む

6章 Webサイトでの行動を把握するためのデータ抽出

6-1 サイト全体の特徴・傾向を見つける

6-1-1 日次の訪問者数・訪問回数・ページビューを集計する

import pandas as pd
import psycopg2
import numpy as np

conn = psycopg2.connect("dbname=BigData host=localhost user=testuser")
df_acc = pd.read_sql("SELECT * FROM access_log", conn)
print(df_acc.head(3))

f:id:bitop:20180813081216p:plain

conn = psycopg2.connect("dbname=BigData host=localhost user=testuser")
df_pur = pd.read_sql("SELECT * FROM purchase_log", conn)
print(df_pur.head(3))

f:id:bitop:20180813081300p:plain

df_acc['date'] = pd.to_datetime(df_acc['stamp'])
df_acc['date'] = df_acc['date'].dt.strftime('%Y-%m-%d')
df_acc_gou = df_acc.groupby('date')
data1 = df_acc_gou['long_session'].unique().map(lambda x:len(x))
data2 = df_acc_gou['short_session'].unique().map(lambda x:len(x))
data3 = df_acc_gou['stamp'].count()
df = pd.DataFrame({'access_users':data1,'access_count':data2,'page_view':data3})
df['pv_per_user'] = df['page_view']/df['access_count']
print(df)

f:id:bitop:20180813081349p:plain