2014/06/10

Google AppEngineからBigQueryのStream APIにデータを流しこむ方法

BigQueryはビッグデータ解析基盤としてかなり優秀だと思っていました。
ただ、データのアップロードがかなり面倒だった(一度Google Cloud StorageにアップロードしてBigQueryからそのデータを引っ張る)ので二の足を踏んでいました。
最近、Stream APIの制限がかなり緩和されたので、これを期にGoogle AppEngine(Python)と連携させてみた。

STEP1: BigQueryのセットアップ
まずはBigQueryを使える状況にする必要がある。
Google Developers Consoleからプロジェクトを作り、プロジェクトメニューの「APIs」からBigQuery APIをONにする。プロジェクト自体のBilling設定が必要なので注意。といってもかなりの量は無料で使える。

STEP2: クラウドのパーミッション設定
Google Developers Consoleのプロジェクトの「権限(Permissions)」メニューの中の「サービス アカウント」にApp Engineのサービスアカウントを追加する。
AppEngineのサービスアカウントはAppEngineのコンソールの「Application Settings」メニューの中の「Service Account Name」の内容。xxxx@appspot.gserviceaccount.comの様なフォーマットになっているのがそれ。
これで、AppEngineから同一プロジェクトのクラウドサービスへ簡単に(&安全に)接続できるようになります。

STEP3: Google AppEngineにライブラリーをインストール
Google APIを簡単に使うためのライブラリを幾つかインストールしておきます。Pure PythonなのでファイルをコピーするだけでOK。
Downloadはgoogle-api-python-clientからgoogle-api-python-client-1.2.zipをダウンロードしました。
ZIPファイル内の
・apiclientフォルダ
・oauth2clientフォルダ
・uritemplateフォルダ
の3つのフォルダをAppEngineのプロジェクトにコピーします。
私はライブラリ関係をまとめておくため、AppEngineのプロジェクトフォルダ内にlibフォルダを作り、そこに上記3つのフォルダーをコピーしています。

STEP4: AppEngineのからデータをBigQueryに流し込む
BigQueryのStream APIを使う。
Pythonのソースコードは以下の通り
if 'lib' not in sys.path:
    sys.path[0:0] = ['lib']

PROJECT_ID = 'my-project'
SCOPE = 'https://www.googleapis.com/auth/bigquery'

class BqHandler(webapp.RequestHandler):
  def get(self):
    import httplib2
    from apiclient import discovery
    from oauth2client import appengine
    from google.appengine.api import memcache

    credentials = appengine.AppAssertionCredentials(scope=SCOPE)
    http = credentials.authorize(httplib2.Http(memcache))
    bigquery = discovery.build('bigquery', 'v2', http=http)

    body = { 'rows':[{'json':{'id':1, 'name':'Larry'}}, {'json':{'id':2, 'name':'Sergey'}}] }
    response = bigquery.tabledata().insertAll(projectId=PROJECT_ID,
                                                                     datasetId='AppEngine',
                                                                     tableId='MyTable',
                                                                     body=body).execute()

    return

application = webapp.WSGIApplication(
                                     [('/bq', BqHandler)],
                                     debug=True)

def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()

BigQueryに流し込むデータはbodyです。BigQueryはCSV形式とJSON形式をサポートしていますので、ここではJSON形式にします。
insertAllメソッドでStream APIを使っています。
datasetIdとはデータテーブルを纏める上位構造で、実際のデータが格納されるのはtableIdのテーブルです。

STEP5: BigQueryにDatasetとテーブルを作る
STEP4のプログラムを実行しても、まだBigQuery側に対象となるDatasetとテーブルが存在しないのでデータが保存できません。
そこで、BigQueryのコンソールからまず、DatasetとTableを作ります。

プロジェクト名の右側のプルダウンメニューから「Create new dataset」を選択します。
Dataset IDを「AppEngine」にします。
つぎに作られた「AppEngine」のデータセットの右側のプルダウンメニューから「Create new table」でデータを格納するテーブルを作成します。
「Table ID」を「MyTable」にし、Source FormatをJSONとし、サンプルとなるファイルをアップロードします。
例えば以下の様なJSONファイルをアップロードします。

{"id":0,"name":"test"}

次にテーブルのスキーマを指定します。
今回はidとnameで、それぞれの型が「integer」と「string」なので、以下のように設定します。
id:integer,name:string
これで設定が完了です。アップロードしたJSONファイルと設定したスキーマからテーブルが自動で作成されれば成功です(数十秒かかるようです)。
これでSTEP4のプログラムをAppEngine上で動かせば、bodyの内容がAppEngineデータセットのMyTableにデータが格納されます。
これで、AppEngineからBigQueryに色々なデータを格納できるようになり、そこで解析できるようになりました。