【Python】bitflyerのTickerを SQLiteに突っ込む(定期処理)
おはようございます。
前々回に続き、TickerをSQLiteに登録する処理で、今回は定期的に実行する方法を書いてみました。
本当はスレッドなんかで各分毎に処理を動かしたかったのですが、大分躓いてしまったのでとりあえず1分単位のみ。
プログラムは前々回のものを流用します。
【Python】BitflyerのTickerをSQLiteに突っ込む
スポンサーリンク
新規クラスの追加
単独で動かすクラスを追加
BackgroundSaveTicker.py
# -*- coding: utf-8 -*- """ Created on 2018/03/14 @author: doraxdora """ import json import time import threading import asyncio from BfApi import BfApi from Utils.SQLiteUtil import SQLiteUtil class BackgroundSaveTicker: def __init__(self, interval, wait=True, db_name="Ticker.db"): self.interval = interval self.wait = wait self.db_name = db_name self.api = BfApi() def task(self): data = self.api.call_pub_nub('lightning_ticker_FX_BTC_JPY') message = json.dumps(data) print(self.db_name + ":" + message) sqlite_util = SQLiteUtil(db_name=self.db_name) sqlite_util.insert_data(self.api.convert_ticker_tuple(data)) def save(self): base_time = time.time() next_time = 0 while True: t = threading.Thread(target=self.task) t.start() if self.wait: t.join() next_time = ((base_time - time.time()) % self.interval) or self.interval time.sleep(next_time) return True def print(self): print(self.db_name) if __name__ == '__main__': print('Main start') BackgroundSaveTicker(60, False, "Ticker1M.db").save() print('Main end')
プログラムの修正
APIの修正
BfApi.py
pubnubのコールバッククラスに追加した処理をクラスの外に抽出
(分かりにくいので全体を載せておきます)
# -*- coding: utf-8 -*- """ Created on 2018/03/14 @author: doraxdora """ import hashlib import hmac import json import logging import requests import time import urllib from dateutil import parser from datetime import timedelta from pubnub.callbacks import SubscribeCallback from pubnub.enums import PNStatusCategory from pubnub.pnconfiguration import PNConfiguration from pubnub.pubnub import PubNub, SubscribeListener from pubnub.pubnub_tornado import PubNubTornado from common.Constants import Constants from Utils.SQLiteUtil import SQLiteUtil class BfApi: """ Bitflyer API を利用するためのツールクラス """ def __init__(self, access_key="キー", secret_key="シークレットキー"): self.access_key = access_key self.secret_key = secret_key self.api_url = "https://api.bitflyer.jp" self.pb_config = PNConfiguration() self.pb_config.subscribe_key = "sub-c-52a9ab50-291b-11e5-baaa-0619f8945a4f" self.pb_config.ssl = False self.pub_nub = PubNubTornado(self.pb_config) self.listener = None def call_pub_nub(self, channels): """ pubnub を利用して指定したチャネルからデータを取得 :param channels: 接続チャネル :return: リアルタイム配信データ """ # pubnubの生成 self.pub_nub = PubNub(self.pb_config) self.listener = SubscribeListener() self.pub_nub.add_listener(self.listener) # チャンネルへ接続要求し接続を待機する self.pub_nub.subscribe().channels(channels).execute() self.listener.wait_for_connect() # リアルタイム配信からデータを取得 result = self.listener.wait_for_message_on(channels) data = result.message # チャンネルの接続解除を要求し切断を待機する self.pub_nub.unsubscribe().channels(channels).execute() self.listener.wait_for_disconnect() return data def send_req(self, api_path, http_method="GET", timeout=None, **send_params): """ Bitflyer Private API を利用してリクエストを送信し 取得したデータを JSON 形式で返却します :param api_path: 呼び出す Private API のパス :param http_method: GET/POST :param timeout: 接続タイムアウト時間 :param send_params: APIに送信するパラメータ :return: 取得したデータのJSON """ url = self.api_url + api_path body = "" auth_header = None if http_method == Constants.HTTP.POST: body = json.dumps(send_params) else: if send_params: body = "?" + urllib.parse.urlencode(send_params) if self.access_key and self.secret_key: access_time = str(time.time()) encode_secret_key = str.encode(self.secret_key) encode_text = str.encode(access_time + http_method + api_path + body) access_sign = hmac.new(encode_secret_key, encode_text, hashlib.sha256).hexdigest() auth_header = { 'ACCESS-KEY': self.access_key, 'ACCESS-TIMESTAMP': access_time, 'ACCESS-SIGN': access_sign, 'Content-Type': 'application/json' } try: with requests.Session() as s: if auth_header: s.headers.update(auth_header) if http_method == Constants.HTTP.GET: response = s.get(url, params=send_params, timeout=timeout) else: response = s.post(url, data=json.dumps(send_params), timeout=timeout) except requests.RequestException as e: logging.error(e) raise e content = "" if len(response.content) > 0: content = json.loads(response.content.decode("utf-8")) return content def start_pub_nub_ticker(self, cl, channels): """ ティッカー情報の配信を開始 :param cl: web_socket client :param channels: 配信するチャンネル :return: """ self.listener = self.MySubscriberCallback(cl) self.pub_nub.add_listener(self.listener) self.pub_nub.subscribe().channels(channels).execute() def stop_pub_nub_ticker(self, channels): """ ティッカー情報の配信を停止 :param channels: 停止するチャンネル :return: """ self.pub_nub.unsubscribe().channels(channels).execute() self.pub_nub.remove_listener(self.listener) def save_ticker(self, message): """ ティッカー情報をデータベースに保存 :param message: :return: """ sqlite_util = SQLiteUtil() sqlite_util.insert_data(self.convert_ticker_tuple(message.message)) def convert_ticker_tuple(self, ticker): """ ティッカー情報をタプルに変換して返す :param ticker: :return: """ return ( ticker["product_code"] , self.parse_date(ticker["timestamp"]) , ticker["tick_id"] , ticker["best_bid"] , ticker["best_ask"] , ticker["best_bid_size"] , ticker["best_ask_size"] , ticker["total_bid_depth"] , ticker["total_ask_depth"] , ticker["ltp"] , ticker["volume"] , ticker["volume_by_product"] ) def parse_date(self, iso_date): date_time = parser.parse(iso_date) + timedelta(hours=9) return date_time.strftime("%Y/%m/%d %H:%M:%S") class MySubscriberCallback(SubscribeCallback): """ Pubnub登録のコールバッククラス """ def __init__(self, client=None): self.cl = client def presence(self, pubnub, presence): pass # handle incoming presence data def status(self, pubnub, status): if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory: pass # This event happens when radio / connectivity is lost elif status.category == PNStatusCategory.PNConnectedCategory: # Connect event. You can do stuff like publish, and know you'll get it. # Or just use the connected event to confirm you are subscribed for # UI / internal notifications, etc pass elif status.category == PNStatusCategory.PNReconnectedCategory: pass # Happens as part of our regular operation. This event happens when # radio / connectivity is lost, then regained. elif status.category == PNStatusCategory.PNDecryptionErrorCategory: pass # Handle message decryption error. Probably client configured to # encrypt messages and on live data feed it received plain text. def message(self, pubnub, message): """ 登録したチャンネルからメッセージを受信した際の処理 :param pubnub: :param message: :return: """ # WEBソケットを利用してクライアントに配信 for c in self.cl: c.write_message(message.message) self.save_ticker(message)
SQLiteユーティリティの修正
インスタンス生成時にDB名を渡すように修正し、
削除用のメソッドを追加。
import sqlite3 import logging from contextlib import closing class SQLiteUtil: """ SQLite 操作用クラス """ def __init__(self, db_name="Ticker.db"): """ インスタンス作成時にテーブルを作る """ self.db_name = "database\\" + db_name self.create_db() def create_db(self): """ データベース、及び必要なテーブルを作成します. :return: """ logging.info("create database") with closing(sqlite3.connect(self.db_name)) as conn: c = conn.cursor() # ティッカーテーブル sql = "CREATE TABLE IF NOT EXISTS TBL_TICKER (" sql += " PRODUCT_CODE TEXT" sql += ", TIME_STAMP TEXT" sql += ", TICK_ID TEXT" sql += ", BEST_BID REAL" sql += ", BEST_ASK REAL" sql += ", BEST_BID_SIZE REAL" sql += ", BEST_ASK_SIZE REAL" sql += ", TOTAL_BID_DEPTH REAL" sql += ", TOTAL_ASK_DEPTH REAL" sql += ", LTP REAL" sql += ", VOLUME REAL" sql += ", VOLUME_BY_PRODUCT REAL" sql += ", PRIMARY KEY (TICK_ID)" sql += ")" c.execute(sql) c.close() conn.commit() def delete_data(self): """ データを削除します :return: """ logging.info("delete_data") with closing(sqlite3.connect(self.db_name)) as conn: c = conn.cursor() # データクリア sql = "DELETE FROM TBL_TICKER" c.execute(sql) c.close() conn.commit() def insert_data(self, ticker): """ データを登録します :param ticker: :return: """ with closing(sqlite3.connect(self.db_name)) as conn: c = conn.cursor() # 猫データ sql = "INSERT INTO TBL_TICKER VALUES (?,?,?,?,?,?,?,?,?,?,?,?)" c.execute(sql, ticker) c.close() conn.commit()
起動、データ確認
新規作成した BackgroundSaveTickerを起動しデータを確認
A5SQLは、いちいち接続先の設定を作らなくてはならず面倒だったので、新しく「PupSQLite」というツールを使ってみる。
無事に1分毎のデータが取れました。
ただ、これでいいのだろうか。。
まとめ
pubnubによる配信(タイミング)でデータ登録するのではなく、こちらからタイミングを決めてデータを登録する方法でした。
まだちょっと非同期やスレッドの部分であいまいなところがあり躓いてしまったのでそこらへんも勉強していこうと思います。
ではでは。
ディスカッション
コメント一覧
まだ、コメントがありません