【Python】BitflyerのTickerをSQLiteに突っ込む
おはようございます。
今回は、前回までに取得できるようにした Bitflyer の Ticker情報を、そのままSQLiteに保存してみました。
プログラムは前回のものを流用します。
【Python】pubnub と WebSocket で更にリアルタイムで情報を取得する
スポンサーリンク
新規クラスの追加
Utils/SQLiteUtil.py
import sqlite3 import logging from contextlib import closing class SQLiteUtil: u""" SQLite 操作用クラス """ def __init__(self): """ インスタンス作成時にテーブルを作る """ self.db_name = "Ticker.db" self.create_db() def create_db(self): u""" データベース、及び必要なテーブルを作成します. """ logging.info("create database") with closing(sqlite3.connect(self.db_name)) as conn: c = conn.cursor() # データクリア sql = "DELETE FROM TBL_TICKER" c.execute(sql) # ティッカーテーブル sql = "CREATE TABLE IF NOT EXISTS TBL_TICKER (" sql += " PRODUCT_CODE TEXT" sql += ", TIME_STAMP TEXT" sql += ", TICK_ID TEXT" sql += ", BEST_BID REAL" sql += ", BEST_ASK REAL" sql += ", BEST_BID_SIZE REAL" sql += ", BEST_ASK_SIZE REAL" sql += ", TOTAL_BID_DEPTH REAL" sql += ", TOTAL_ASK_DEPTH REAL" sql += ", LTP REAL" sql += ", VOLUME REAL" sql += ", VOLUME_BY_PRODUCT REAL" sql += ", PRIMARY KEY (TICK_ID)" sql += ")" c.execute(sql) c.close() conn.commit() def insert_data(self, ticker): u""" 渡されたタプルデータを登録します """ with closing(sqlite3.connect(self.db_name)) as conn: c = conn.cursor() # 猫データ sql = "INSERT INTO TBL_TICKER VALUES (?,?,?,?,?,?,?,?,?,?,?,?)" c.execute(sql, ticker) c.close() conn.commit()
プログラムの修正
BfApi.py
ライブラリ使用宣言の追加
from dateutil import parser from datetime import timedelta from Utils.SQLiteUtil import SQLiteUtil
pubnubのコールバッククラスで、メッセージ受信時の処理を修正
class MySubscriberCallback(SubscribeCallback): """ Pubnub登録のコールバッククラス """ def __init__(self, client): self.cl = client self.sqlite_util = SQLiteUtil() def presence(self, pubnub, presence): pass # handle incoming presence data def status(self, pubnub, status): if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory: pass # This event happens when radio / connectivity is lost elif status.category == PNStatusCategory.PNConnectedCategory: # Connect event. You can do stuff like publish, and know you'll get it. # Or just use the connected event to confirm you are subscribed for # UI / internal notifications, etc pass elif status.category == PNStatusCategory.PNReconnectedCategory: pass # Happens as part of our regular operation. This event happens when # radio / connectivity is lost, then regained. elif status.category == PNStatusCategory.PNDecryptionErrorCategory: pass # Handle message decryption error. Probably client configured to # encrypt messages and on live data feed it received plain text. def message(self, pubnub, message): """ 登録したチャンネルからメッセージを受信した際の処理 :param pubnub: :param message: :return: """ # WEBソケットを利用してクライアントに配信 for c in self.cl: d = message.message self.sqlite_util.insert_data(( d["product_code"] , self.parse_date(d["timestamp"]) , d["tick_id"] , d["best_bid"] , d["best_ask"] , d["best_bid_size"] , d["best_ask_size"] , d["total_bid_depth"] , d["total_ask_depth"] , d["ltp"] , d["volume"] , d["volume_by_product"] )) c.write_message(message.message) def parse_date(self, iso_date): date_time = parser.parse(iso_date) + timedelta(hours=9) return date_time.strftime("%Y/%m/%d %H:%M:%S")
データを追加してみる
起動して一覧の更新をスタートしてみてから、
A5:SQL Mk2 にてデータの確認をしました。
無事にデータが追加されています。
まとめ
単純にpubnubだけ動かすツールを作って、データを収集するだけでもよさそうですね。
ただ、Ticker情報だとデータ量が膨大になるため、1分、5分、10分などで処理やテーブルを分けてしまってもいいかもしれません。
ではでは。
ディスカッション
コメント一覧
まだ、コメントがありません