【Python】pubnub と WebSocket で更にリアルタイムで情報を取得する
おはようございます。
えーと、SQLiteにデータ登録なんかしようと思ってたんですが、
どう実装すればいいか分からず別の方法で実装した、pubnub で ティッカー情報が更新される度に画面の情報も更新する方法が分かりましたのでやってみました。
プログラムは前回のものを流用します。
【Python】bitflyer の API を使って注文を送信する
スポンサーリンク
目次
画面の修正
Main.html
ティッカー情報の配信を開始するボタンと停止するボタンを追加しました。
<div style="clear:both; padding-top:10px;"> <div class="entry_title"> <div class="pull_left">ティッカー情報</div> <div class="pull_right"> <input type="button" value="更新開始" /> <input type="button" value="更新停止" /> </div> </div> <table id="tickerTable"> <tr class="header"> <th style="width:5%">種別</th> <th style="width:10%">時刻</th> <th style="width:5%">ID</th> <th style="width:5%">売値</th> <th style="width:5%">買値</th> <th style="width:10%">売り数量</th> <th style="width:10%">買い数量</th> <th style="width:10%">売り注文総数</th> <th style="width:10%">買い注文総数</th> <th style="width:10%">最終取引価格</th> <th style="width:10%">出来高</th> <th style="width:10%">価格単位出来高</th> </tr> </table> </div>
プログラムの修正
APIの修正
BfApi.py
インポートするクラス
import hashlib import hmac import json import logging import requests import time import urllib from common.Constants import Constants from pubnub.callbacks import SubscribeCallback from pubnub.enums import PNStatusCategory from pubnub.pnconfiguration import PNConfiguration from pubnub.pubnub import SubscribeListener from pubnub.pubnub_tornado import PubNubTornado
追加するメソッド、内部クラス
def start_pub_nub_ticker(self, cl, channels): """ ティッカー情報の配信を開始 :param cl: web_socket client :param channels: 配信するチャンネル :return: """ self.listener = self.MySubscriberCallback(cl) self.pub_nub.add_listener(self.listener) self.pub_nub.subscribe().channels(channels).execute() def stop_pub_nub_ticker(self, channels): """ ティッカー情報の配信を停止 :param channels: 停止するチャンネル :return: """ self.pub_nub.unsubscribe().channels(channels).execute() self.pub_nub.remove_listener(self.listener) class MySubscriberCallback(SubscribeCallback): """ Pubnub登録のコールバッククラス """ def __init__(self, clent): self.cl = clent def presence(self, pubnub, presence): pass # handle incoming presence data def status(self, pubnub, status): if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory: pass # This event happens when radio / connectivity is lost elif status.category == PNStatusCategory.PNConnectedCategory: # Connect event. You can do stuff like publish, and know you'll get it. # Or just use the connected event to confirm you are subscribed for # UI / internal notifications, etc pass elif status.category == PNStatusCategory.PNReconnectedCategory: pass # Happens as part of our regular operation. This event happens when # radio / connectivity is lost, then regained. elif status.category == PNStatusCategory.PNDecryptionErrorCategory: pass # Handle message decryption error. Probably client configured to # encrypt messages and on live data feed it received plain text. def message(self, pubnub, message): """ 登録したチャンネルからメッセージを受信した際の処理 :param pubnub: :param message: :return: """ # WEBソケットを利用してクライアントに配信 for c in self.cl: c.write_message(message.message)
メインクラスのグローバル変数を追加
BfTool.py
cl = [] api_ticker = None
WEBソケットの修正
BfTool.py
ひとまずは、既存のティッカー情報更新用メソッドはコメントアウトし、
新しい仕組みを追加しました。
class SendWebSocket(WebSocketHandler): """ WEBソケット通信 1秒毎にティッカー情報を画面に配信します """ def open(self): logging.info('SendWebSocket [open] IP :' + self.request.remote_ip) self.ioloop = tornado.ioloop.IOLoop.instance() # self.send_ticker() if self not in cl: cl.append(self) def on_message(self, message): logging.info("WebSocketHandler [on_message]") for client in cl: client.write_message(message) def on_close(self): logging.info("SendWebSocket [on_closed]") if self in cl: cl.remove(self) def check_origin(self, origin): logging.info("SendWebSocket [check_origin]") return True def send_ticker(self): self.ioloop.add_timeout(time.time() + 2, self.send_ticker) if self.ws_connection: api = BfApi() data = api.call_pub_nub('lightning_ticker_FX_BTC_JPY') message = json.dumps(data) self.write_message(message)
ティッカー情報配信の開始と停止メソッド追加
BfTool.py
画面から開始、停止ができるようにURLメソッドを追加しました。
class StartTicker(RequestHandler): """ ティッカー情報の更新を開始 """ def initialize(self): logging.info("StartTicker [initialize]") def post(self): logging.info("StartTicker [post]") global api_ticker api_ticker = BfApi() api_ticker.start_pub_nub_ticker(cl, ['lightning_ticker_FX_BTC_JPY']) class StopTicker(RequestHandler): """ ティッカー情報の更新を停止 """ def initialize(self): logging.info("StopTicker [initialize]") def post(self): logging.info("StopTicker [post]") global api_ticker if api_ticker: api_ticker.stop_pub_nub_ticker('lightning_ticker_FX_BTC_JPY')
URLマッピングの追加
BfTool.py
app = tornado.web.Application([ (r"/", MainHandler), (r"/ws", SendWebSocket), (r"/balance", GetBalanceHandler), (r"/execution", GetExecutionHandler), (r"/childOrder", GetChildOrderHandler), (r"/sendOrder", SendChildOrderHandler), (r"/cancelOrder", CancelChildOrderHandler), (r"/startTicker", StartTicker), (r"/stopTicker", StopTicker) ], template_path=os.path.join(os.getcwd(), "templates"), static_path=os.path.join(os.getcwd(), "static"), js_path=os.path.join(os.getcwd(), "js"), )
画面処理の追加
script.js
/** * ティッカーの更新を開始 */ function startTicker() { $.ajax({ url: "http://localhost:8080/startTicker", type: "POST", success: function(jsonResponse) { console.log(jsonResponse); }, error: function() { } }); } /** * ティッカーの更新を停止 */ function stopTicker() { $.ajax({ url: "http://localhost:8080/stopTicker", type: "POST", success: function(jsonResponse) { console.log(jsonResponse); }, error: function() { } }); }
起動してみる
スクショじゃ分からないですが、すごい速さでティッカー情報が更新されていきます。
この速度では10件表示していてもすぐ流れて行ってしまうので、本来であれば最新のみ表示するとかで十分ですね。
あとは、自動取引のトリガーとすることが多いようです。
そのうち自動取引の条件なんかを画面で設定し、自動取引の開始・停止が出来るようにしたいですね。
ディスカッション
コメント一覧
まだ、コメントがありません