Treasure DataとEmbulkで、バスケット分析をRENOSY MAGAZINEに導入してみた

はじめに

こんにちは。RENOSYプロダクトの開発をしている生田です。
先日、RENOSY MAGAZINE(以下、MAGAZINE)におすすめ記事の機能を追加しました。
その際、Embulkという技術を使ってデータの抽出・転送を行ったので、その方法等について共有します。

f:id:taimei_soda:20200312110034p:plain

Treasure Data - Embulk - MAGAZINE構成図

いきなりですが、全体の構成図を示します。
各処理の詳細は後述します。

f:id:taimei_soda:20200310122213p:plain
embulk構成図

背景

MAGAZINEはブログ記事や不動産用語集など、お客様の気になる情報や役立つ情報を掲載するWebサイトです。
今回このMAGAZINEにおすすめ記事を実装することになりました。
「この記事を見た人がよく見ている記事」的な、定番のアレです。
ねらいは、お客様が閲覧中の記事以外の記事にも興味を持っていただき、回遊率をあげることによって、見込み顧客の育成を行うことです。

設計方針

おすすめ記事としてどの記事を表示するのかの判定材料として、お客様がどの記事とどの記事を一緒にみることが多いのか、いわゆる「共起度」を採用することにしました。
PVデータはTreasure Dataに蓄積されているので、ここからデータを取得して共起度を分析します。
分析した結果をMAGAZINEのデータベースに保存し、これを元にアプリケーション側で表示する記事を選定し、サイト上に表示します。
また、Treasure Dataのデータはユーザーがページにアクセスするたびに更新されるので、定期的にデータを再取得するバッチを回します。

実装方法

Embulk

Treasure DataからPVデータを取得・分析し、MAGAZINEのデータベースに保存する部分を、Embulkというミドルウェアを使って実現しました。
Embulkとは、Treasure Data社が提供するオープンソースのデータ転送ミドルウェアで、分散並列処理を使用したバッチ処理などのバルク処理に適しています。
また、プラグインをインストールすることによって機能を拡張していく構造になっており、様々なプラグインが公開されているため、データベースへの接続や出力の柔軟性が高いのも特徴です。

バスケット分析

共起度分析のロジックにはバスケット分析を用いました。
簡単言うと、同じユーザーがある記事Aとある記事Bを両方見る事象が何回起きたか、という共起回数をもとに、共起度を計算するというものです。

バッチ処理

バッチ処理にはAWSのECS Scheduled Tasksを使用しました。
前段階として、バッチで使うdocker imageをAWS ECRにpushしておく必要があります。
ECRにpushする処理は、CircleCIによる継続的インテグレーションの中に組み込みました。
具体的には、Embulk実行に必要なファイルを管理するためのリポジトリがあり、その特定のブランチへのpushをトリガーとして、DockerfileイメージをECRにpushするCircleCIのjobが実行される、という流れです。
ECS Scheduled Tasks側で、使用するイメージのurl(ECR内のdocker image)を指定しておくと、そのイメージをpullしてきてrunしてくれます。
また、cronでバッチ実行の時間指定をすることができます。

Embulkの使い方

Embulkの使い方を、実際に作成したソースコードと共に紹介します。
Embulkをインストールし、その後必要なプラグインをインストールします。
また、config配下に任意の名前で.ymlファイル、もしくは.yml.liquidファイルを作成し、データの分析や転送の設定を書きます。
.yml.liquidにすると、ファイル内で環境変数を使えるようになります。

  • Dockerfile
FROM openjdk:8-jre-slim

RUN apt update && \
    apt install -y curl jq && \
    curl --create-dirs -o /usr/local/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar" && \
    chmod +x /usr/local/bin/embulk

RUN embulk gem install embulk-input-td embulk-output-mysql

WORKDIR /myapp

COPY . /myapp

CMD [ "bash", "-c", "sh embulk-execute.sh" ]

embulkのコア部分はjavaで書かれているため、イメージはopenjdkを使いました。
プラグインはjrubyで実装されており、embulk gem install [plugin]のような形でインストールします。

  • embulk-execute.sh
#!/bin/sh

# Secrets Managerから取得した秘匿情報が含まれるJSONのキーを環境変数に展開
$( echo $SECRETS | jq -r 'keys[] as $k | "export \($k)=\(.[$k])"' )

embulk run config/basket_analysis.yml.liquid
  • config/basket_analysis.yml.liquid
in:
  type: td
  apikey: {{ env.API_KEY }}
  endpoint: api.treasuredata.com
  database: website_tracking
  query: |
    WITH magazine_entry_id AS(
      SELECT 
        td_global_id,
        REGEXP_EXTRACT(td_url, '\d+$') AS entry_id
      FROM
        track_pageviews
      WHERE
        REGEXP_EXTRACT(td_url, '/www.renosy.com/magazine/entries/\d+$') IS NOT NULL
    ) SELECT

~~ 省略 ~~

out:
  type: mysql
  host: {{ env.host }}
  port: {{ env.port }}
  user: {{ env.username }}
  database: {{ env.dbname }}
  password: {{ env.password }}
  table: magazine_entry_cooccurrence_degrees
  mode: truncate_insert

in:の下にデータの取得先に対する設定を書きます。
今回の例ではTreasure Dataからデータを取得するので、プラグインembulk-input-tdを使用しました。 query:にバスケット分析(前述)のクエリを渡しています。

out:の下にはデータの転送先に対する設定を書きます。
今回はMAGAZINEのデータベース(mysql使用)に接続するので、mysqlのプラグインembulk-output-mysqlを用いました。
mode:でデータを上書きするのか、追加するのか、transactionをはるのか、といった設定を選ぶことができます。

まとめ

触ってみった所感

Embulkを使ってみた感想としては、設定ファイル1つと必要なpluginのインストールだけで出来るのでとてもお手軽だなと思いました。

課題と感じたところ

output側に既存のテーブルを設定した場合、input側のデータの型と合わなかったり、output側にあるカラムがinput側になかったりすると、その差分を埋めるための工夫をする必要があり、そこが大変だなと感じました。

解決した方法

例えば、railsではデフォルトでcreated_at, updated_atというカラムが作られますが、これらはTreasure Dataのtableにはなかったので、上記の設定ファイルでは失敗してしまいます。
その場合には、設定ファイルにカラムの追加と現在の時間を値に入れる処理を追加する、あるいはrailsのcreated_at, updated_atカラムを削除する等の解決方法が考えられます。
今回は特に日時を保存しておく必要が特になかったので、後者の方法を取りました。

まとめ

上記のような多少のつらみはあるものの、基本的な設定だけなら、Embulk初心者の私でもすぐに実行ができたので、「データ転送をお手軽にやりたい!」という方にはおすすめです。
皆さんも試しに使ってみてはいかがでしょうか。