【Python】bitflyerのTickerを SQLiteに突っ込む(定期処理)
おはようございます。
前々回に続き、TickerをSQLiteに登録する処理で、今回は定期的に実行する方法を書いてみました。
本当はスレッドなんかで各分毎に処理を動かしたかったのですが、大分躓いてしまったのでとりあえず1分単位のみ。
プログラムは前々回のものを流用します。
【Python】BitflyerのTickerをSQLiteに突っ込む
スポンサーリンク
新規クラスの追加
単独で動かすクラスを追加
BackgroundSaveTicker.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | # -*- coding: utf-8 -*- """ Created on 2018/03/14 @author: doraxdora """ importjson importtime importthreading importasyncio fromBfApi importBfApi fromUtils.SQLiteUtil importSQLiteUtil classBackgroundSaveTicker: def__init__(self,interval,wait=True,db_name="Ticker.db"): self.interval=interval self.wait=wait self.db_name=db_name self.api=BfApi() deftask(self): data=self.api.call_pub_nub('lightning_ticker_FX_BTC_JPY') message=json.dumps(data) print(self.db_name+":"+message) sqlite_util=SQLiteUtil(db_name=self.db_name) sqlite_util.insert_data(self.api.convert_ticker_tuple(data)) defsave(self): base_time=time.time() next_time=0 whileTrue: t=threading.Thread(target=self.task) t.start() ifself.wait: t.join() next_time=((base_time-time.time())%self.interval)orself.interval time.sleep(next_time) returnTrue defprint(self): print(self.db_name) if__name__=='__main__': print('Main start') BackgroundSaveTicker(60,False,"Ticker1M.db").save() print('Main end') |
プログラムの修正
APIの修正
BfApi.py
pubnubのコールバッククラスに追加した処理をクラスの外に抽出
(分かりにくいので全体を載せておきます)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | # -*- coding: utf-8 -*- """ Created on 2018/03/14 @author: doraxdora """ importhashlib importhmac importjson importlogging importrequests importtime importurllib fromdateutil importparser fromdatetimeimporttimedelta frompubnub.callbacks importSubscribeCallback frompubnub.enums importPNStatusCategory frompubnub.pnconfiguration importPNConfiguration frompubnub.pubnub importPubNub,SubscribeListener frompubnub.pubnub_tornado importPubNubTornado fromcommon.Constants importConstants fromUtils.SQLiteUtil importSQLiteUtil classBfApi: """ Bitflyer API を利用するためのツールクラス """ def__init__(self,access_key="キー",secret_key="シークレットキー"): self.access_key=access_key self.secret_key=secret_key self.api_url="https://api.bitflyer.jp" self.pb_config=PNConfiguration() self.pb_config.subscribe_key="sub-c-52a9ab50-291b-11e5-baaa-0619f8945a4f" self.pb_config.ssl=False self.pub_nub=PubNubTornado(self.pb_config) self.listener=None defcall_pub_nub(self,channels): """ pubnub を利用して指定したチャネルからデータを取得 :param channels: 接続チャネル :return: リアルタイム配信データ """ # pubnubの生成 self.pub_nub=PubNub(self.pb_config) self.listener=SubscribeListener() self.pub_nub.add_listener(self.listener) # チャンネルへ接続要求し接続を待機する self.pub_nub.subscribe().channels(channels).execute() self.listener.wait_for_connect() # リアルタイム配信からデータを取得 result=self.listener.wait_for_message_on(channels) data=result.message # チャンネルの接続解除を要求し切断を待機する self.pub_nub.unsubscribe().channels(channels).execute() self.listener.wait_for_disconnect() returndata defsend_req(self,api_path,http_method="GET",timeout=None,**send_params): """ Bitflyer Private API を利用してリクエストを送信し 取得したデータを JSON 形式で返却します :param api_path: 呼び出す Private API のパス :param http_method: GET/POST :param timeout: 接続タイムアウト時間 :param send_params: APIに送信するパラメータ :return: 取得したデータのJSON """ url=self.api_url+api_path body="" auth_header=None ifhttp_method==Constants.HTTP.POST: body=json.dumps(send_params) else: ifsend_params: body="?"+urllib.parse.urlencode(send_params) ifself.access_key andself.secret_key: access_time=str(time.time()) encode_secret_key=str.encode(self.secret_key) encode_text=str.encode(access_time+http_method+api_path+body) access_sign=hmac.new(encode_secret_key,encode_text,hashlib.sha256).hexdigest() auth_header={ 'ACCESS-KEY':self.access_key, 'ACCESS-TIMESTAMP':access_time, 'ACCESS-SIGN':access_sign, 'Content-Type':'application/json' } try: withrequests.Session()ass: ifauth_header: s.headers.update(auth_header) ifhttp_method==Constants.HTTP.GET: response=s.get(url,params=send_params,timeout=timeout) else: response=s.post(url,data=json.dumps(send_params),timeout=timeout) exceptrequests.RequestException ase: logging.error(e) raisee content="" iflen(response.content)>0: content=json.loads(response.content.decode("utf-8")) returncontent defstart_pub_nub_ticker(self,cl,channels): """ ティッカー情報の配信を開始 :param cl: web_socket client :param channels: 配信するチャンネル :return: """ self.listener=self.MySubscriberCallback(cl) self.pub_nub.add_listener(self.listener) self.pub_nub.subscribe().channels(channels).execute() defstop_pub_nub_ticker(self,channels): """ ティッカー情報の配信を停止 :param channels: 停止するチャンネル :return: """ self.pub_nub.unsubscribe().channels(channels).execute() self.pub_nub.remove_listener(self.listener) defsave_ticker(self,message): """ ティッカー情報をデータベースに保存 :param message: :return: """ sqlite_util=SQLiteUtil() sqlite_util.insert_data(self.convert_ticker_tuple(message.message)) defconvert_ticker_tuple(self,ticker): """ ティッカー情報をタプルに変換して返す :param ticker: :return: """ return( ticker["product_code"] ,self.parse_date(ticker["timestamp"]) ,ticker["tick_id"] ,ticker["best_bid"] ,ticker["best_ask"] ,ticker["best_bid_size"] ,ticker["best_ask_size"] ,ticker["total_bid_depth"] ,ticker["total_ask_depth"] ,ticker["ltp"] ,ticker["volume"] ,ticker["volume_by_product"] ) defparse_date(self,iso_date): date_time=parser.parse(iso_date)+timedelta(hours=9) returndate_time.strftime("%Y/%m/%d %H:%M:%S") classMySubscriberCallback(SubscribeCallback): """ Pubnub登録のコールバッククラス """ def__init__(self,client=None): self.cl=client defpresence(self,pubnub,presence): pass # handle incoming presence data defstatus(self,pubnub,status): ifstatus.category==PNStatusCategory.PNUnexpectedDisconnectCategory: pass # This event happens when radio / connectivity is lost elifstatus.category==PNStatusCategory.PNConnectedCategory: # Connect event. You can do stuff like publish, and know you'll get it. # Or just use the connected event to confirm you are subscribed for # UI / internal notifications, etc pass elifstatus.category==PNStatusCategory.PNReconnectedCategory: pass # Happens as part of our regular operation. This event happens when # radio / connectivity is lost, then regained. elifstatus.category==PNStatusCategory.PNDecryptionErrorCategory: pass # Handle message decryption error. Probably client configured to # encrypt messages and on live data feed it received plain text. defmessage(self,pubnub,message): """ 登録したチャンネルからメッセージを受信した際の処理 :param pubnub: :param message: :return: """ # WEBソケットを利用してクライアントに配信 forcinself.cl: c.write_message(message.message) self.save_ticker(message) |
SQLiteユーティリティの修正
インスタンス生成時にDB名を渡すように修正し、
削除用のメソッドを追加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | importsqlite3 importlogging fromcontextlibimportclosing classSQLiteUtil: """ SQLite 操作用クラス """ def__init__(self,db_name="Ticker.db"): """ インスタンス作成時にテーブルを作る """ self.db_name="database\\"+db_name self.create_db() defcreate_db(self): """ データベース、及び必要なテーブルを作成します. :return: """ logging.info("create database") withclosing(sqlite3.connect(self.db_name))asconn: c=conn.cursor() # ティッカーテーブル sql="CREATE TABLE IF NOT EXISTS TBL_TICKER (" sql+=" PRODUCT_CODE TEXT" sql+=", TIME_STAMP TEXT" sql+=", TICK_ID TEXT" sql+=", BEST_BID REAL" sql+=", BEST_ASK REAL" sql+=", BEST_BID_SIZE REAL" sql+=", BEST_ASK_SIZE REAL" sql+=", TOTAL_BID_DEPTH REAL" sql+=", TOTAL_ASK_DEPTH REAL" sql+=", LTP REAL" sql+=", VOLUME REAL" sql+=", VOLUME_BY_PRODUCT REAL" sql+=", PRIMARY KEY (TICK_ID)" sql+=")" c.execute(sql) c.close() conn.commit() defdelete_data(self): """ データを削除します :return: """ logging.info("delete_data") withclosing(sqlite3.connect(self.db_name))asconn: c=conn.cursor() # データクリア sql="DELETE FROM TBL_TICKER" c.execute(sql) c.close() conn.commit() definsert_data(self,ticker): """ データを登録します :param ticker: :return: """ withclosing(sqlite3.connect(self.db_name))asconn: c=conn.cursor() # 猫データ sql="INSERT INTO TBL_TICKER VALUES (?,?,?,?,?,?,?,?,?,?,?,?)" c.execute(sql,ticker) c.close() conn.commit() |
起動、データ確認
新規作成した BackgroundSaveTickerを起動しデータを確認
A5SQLは、いちいち接続先の設定を作らなくてはならず面倒だったので、新しく「PupSQLite」というツールを使ってみる。
無事に1分毎のデータが取れました。
ただ、これでいいのだろうか。。
まとめ
pubnubによる配信(タイミング)でデータ登録するのではなく、こちらからタイミングを決めてデータを登録する方法でした。
まだちょっと非同期やスレッドの部分であいまいなところがあり躓いてしまったのでそこらへんも勉強していこうと思います。
ではでは。
ディスカッション
コメント一覧
まだ、コメントがありません