Skip to content
This repository has been archived by the owner on Nov 13, 2023. It is now read-only.

Commit

Permalink
Add OrderBook and make bitstamp use it
Browse files Browse the repository at this point in the history
  • Loading branch information
pjz committed Aug 9, 2016
1 parent 2f24f55 commit 51c307f
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pyalgotrade/bitstamp/livefeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ def getOrderBookUpdateEvent(self):
"""
Returns the event that will be emitted when the orderbook gets updated.
Eventh handlers should receive one parameter:
1. A :class:`pyalgotrade.bitstamp.wsclient.OrderBookUpdate` instance.
Event handlers should receive one parameter:
1. A :class:`pyalgotrade.orderbook.MarketSnapshot` instance.
:rtype: :class:`pyalgotrade.observer.Event`.
"""
Expand Down
48 changes: 23 additions & 25 deletions pyalgotrade/bitstamp/wsclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
.. moduleauthor:: Gabriel Martin Becedillas Ruiz <[email protected]>
"""

import json
import datetime
import threading
import Queue
from decimal import Decimal

from pyalgotrade.websocket import pusher
from pyalgotrade.bitstamp import common

from pyalgotrade.orderbook import Bid, Ask, MarketSnapshot, Assign

VENUE = 'bitstamp'


def get_current_datetime():
return datetime.datetime.now()
Expand Down Expand Up @@ -64,32 +70,24 @@ def isSell(self):
return self.getData()["type"] == 1


class OrderBookUpdate(pusher.Event):
"""An order book update event."""

def __init__(self, dateTime, eventDict):
super(OrderBookUpdate, self).__init__(eventDict, True)
self.__dateTime = dateTime

def getDateTime(self):
"""Returns the :class:`datetime.datetime` when this event was received."""
return self.__dateTime

def getBidPrices(self):
"""Returns a list with the top 20 bid prices."""
return [float(bid[0]) for bid in self.getData()["bids"]]

def getBidVolumes(self):
"""Returns a list with the top 20 bid volumes."""
return [float(bid[1]) for bid in self.getData()["bids"]]
def toBookMessages(bitstamp_json, symbol):
"""convert a bitstamp json message into a list of book messages"""
msg = bitstamp_json
if type(msg) != type({}):
msg = json.loads(msg)
rts = msg.get('timestamp', get_current_datetime())
result = []
for side, skey in ((Bid, "bids"), (Ask, "asks")):
for price, size in msg[skey]:
result.append(Assign(rts, VENUE, symbol, Decimal(price), Decimal(size), side))
return result

def getAskPrices(self):
"""Returns a list with the top 20 ask prices."""
return [float(ask[0]) for ask in self.getData()["asks"]]

def getAskVolumes(self):
"""Returns a list with the top 20 ask volumes."""
return [float(ask[1]) for ask in self.getData()["asks"]]
def bookToSnapshot(bitstamp_json, symbol):
"""convert a bitstamp json book into a MarketSnapshot"""
ts = get_current_datetime()
data = toBookMessages(bitstamp_json, symbol)
return MarketSnapshot(ts, VENUE, symbol, data)


class WebSocketClient(pusher.WebSocketClient):
Expand All @@ -114,7 +112,7 @@ def onMessage(self, msg):
if event == "trade":
self.onTrade(Trade(get_current_datetime(), msg))
elif event == "data" and msg.get("channel") == "order_book":
self.onOrderBookUpdate(OrderBookUpdate(get_current_datetime(), msg))
self.onOrderBookUpdate(bookToSnapshot(msg['data'], 'BTCUSD'))
else:
super(WebSocketClient, self).onMessage(msg)

Expand Down
194 changes: 194 additions & 0 deletions pyalgotrade/orderbook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""
An intentionally simplistic order book implementation
"""

import time

from enum import Enum
from collections import namedtuple
from sortedcontainers import SortedDict


class Side(Enum):
Bid = 1
bid = 1
Ask = 2
ask = 2

Bid, Ask = Side.Bid, Side.Ask

# These classes are used to update the OrderBook

AbstractMarketDataWrapper = namedtuple('AbstractMarketDataWrapper', 'ts venue symbol data')
AbstractMarketDataWrapper.__new__.__defaults__ = (0, '', '', [])

class MarketUpdate(AbstractMarketDataWrapper):
"""An incremental change to the order book"""
pass
class MarketSnapshot(AbstractMarketDataWrapper):
"""A new full order book definition"""
# unenforced, but data should only contain Assigns
pass

# Market{Update,Snapshot} are really just wrappers for these (within .data)
AbstractMarketDataDelta = namedtuple('AbstractMarketDataDelta', 'rts venue symbol price size side')
AbstractMarketDataDelta.__new__.__defaults__ = (0, '', '', 0, 0, Ask)

class Assign(AbstractMarketDataDelta):
"""Set a price/size/side"""
pass
class Increase(AbstractMarketDataDelta):
"""Increase the price/side/size by size"""
pass
class Decrease(AbstractMarketDataDelta):
"""Decrease the price/side/size by size"""
pass

PriceLevel = Assign

# helper object to add some structure

Inside = namedtuple('Inside', 'bid ask')

# The actual OrderBook object

class OrderBook():
"""
Generic book; understands only common messages (for updating the book)
Note: prices and sizes are Decimals (already decoded). Implements L1/L2.
"""
def __init__(self, venue=None, symbol=None):
self.venue = venue
self.symbol = symbol
self.reset()

def reset(self):
"""Reset the OrderBook to an empty state"""
self.bids = SortedDict(lambda k:-k, {}) # maps price: PriceLevel(size, tick)
self.asks = SortedDict({}) # maps price: PriceLevel(size, tick)
self.last = None # the last MarketUpdate or MarketSnapshot

def is_empty(self):
"""returns True iff the OrderBook is empty"""
return self.last is None
#return not (self.bids and self.asks)

def update(self, update):
"""Update the OrderBook with the given update
(either a MarketSnapshot or a MarketUpdate)
"""

def set_pricelevel(side, assign):
ap = assign.price
if assign.size > 0: side[ap] = assign
elif ap in side: del side[ap]
s_pl = set_pricelevel

def make_assign(update, **kwargs):
assign = update._asdict()
assign.update(kwargs)
return PriceLevel(**assign)
mk_a = make_assign

g_sz = lambda s, p: s.get(p, PriceLevel()).size

# check type(update) == MarketUpdate ?
if type(update) == MarketSnapshot: self.reset()
for t in update.data:
tt = type(t)
s = { Ask: self.asks, Bid: self.bids }.get(t.side, None)
if s is None: raise ValueError("Unknown side: %r" % t.side)
tp, ts = t.price, t.size
if tt == Assign: s_pl(s, t)
elif tt == Increase: s_pl(s, mk_a(t, size=g_sz(s, tp) + ts))
elif tt == Decrease: s_pl(s, mk_a(t, size=g_sz(s, tp) - ts))
else: raise ValueError("Unknown type %r of %r" % (type(t), t))

self.last = update
return self

def get_marketsnapshot(self):
"""Return the OrderBook as a MarketSnapshot"""
data = self.bids.values() + self.asks.values()
return MarketSnapshot(time.time(), self.venue, self.symbol, data)

@classmethod
def from_snapshot(cls, snapshot):
"""Create the OrderBook from a MarketSnapshot"""
return cls(snapshot.venue, snapshot.symbol).update(snapshot)

@property
def inside(self):
"""Return the closest bid and ask PriceLevels"""
return Inside(self.inside_bid(), self.inside_ask())

@property
def inside_bid(self):
"""Return the highest bid PriceLevel"""
try:
return self.bids[self.bids.iloc[0]]
except IndexError:
print("!!! Book for venue %s:%s bids are empty!!"%(self.venue, self.symbol))
raise

@property
def inside_ask(self):
"""Return the lowest ask PriceLevel"""
try:
return self.asks[self.asks.iloc[0]]
except IndexError:
print("!!! Book for venue %s:%s asks are empty!!"%(self.venue, self.symbol))
raise

def nvolume(self, nlevels=None):
""" return the inside <nlevels> levels on each side of the book
nlevels = None (the default) means 'all'
"""
bids = self.bids.values()[:nlevels]
asks = self.asks.values()[:nlevels]
return { 'bids': list(bids), 'asks': list(asks) }

def price_for_size(self, side, size):
"""
The cost of the specifed size on the specified side.
Note that this is not 'to fill an order on the specified side',
because Asks fill Bid orders and vice versa.
"""
pside = { Bid: self.bids, Ask: self.asks }[side]
sizeleft = size
value = 0
for price in pside:
pl = pside[price]
s = min(sizeleft, pl.size)
value += s * price
sizeleft -= s
if not sizeleft: break
return value

def npfs(self, size):
"""
Normalized Price For Size
"""
return self.price_for_size(Bid, size)/self.price_for_size(Ask, size)

def size_for_price(self, side, price):
"""
How much size the specified price is worth on the specified side.
"""
pside = { Bid: self.bids, Ask: self.asks }[side]
priceleft = price
size = 0
for price in pside:
pl = pside[price]
p = price * pl.size
if p > priceleft:
priceleft -= p
size += pl.size
else:
size += priceleft / price
break
return size

def nsfp(self, price):
"""Normalized Size For Price"""
return self.size_for_price(Bid, price)/self.size_for_price(Ask, price)
8 changes: 5 additions & 3 deletions samples/tutorial_bitstamp_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pyalgotrade import strategy
from pyalgotrade.technical import ma
from pyalgotrade.technical import cross

from pyalgotrade.orderbook import OrderBook

class Strategy(strategy.BaseStrategy):
def __init__(self, feed, brk):
Expand All @@ -16,13 +16,15 @@ def __init__(self, feed, brk):
self.__ask = None
self.__position = None
self.__posSize = 0.05
self.__book = OrderBook()

# Subscribe to order book update events to get bid/ask prices to trade.
feed.getOrderBookUpdateEvent().subscribe(self.__onOrderBookUpdate)

def __onOrderBookUpdate(self, orderBookUpdate):
bid = orderBookUpdate.getBidPrices()[0]
ask = orderBookUpdate.getAskPrices()[0]
self.__book.update(orderBookUpdate)
bid = self.__book.inside_bid()
ask = self.__book.inside_ask()

if bid != self.__bid or ask != self.__ask:
self.__bid = bid
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
'pyalgotrade.websocket',
],
install_requires=[
"enum34",
"numpy",
"pytz",
"python-dateutil",
Expand Down
7 changes: 4 additions & 3 deletions testcases/bitstamp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from pyalgotrade.bitcoincharts import barfeed as btcbarfeed
from pyalgotrade import strategy
from pyalgotrade import dispatcher

from pyalgotrade.orderbook import OrderBook

class WebSocketClientThreadMock(threading.Thread):
def __init__(self, events):
Expand Down Expand Up @@ -196,8 +196,9 @@ def __init__(self, feed, brk):
feed.getOrderBookUpdateEvent().subscribe(self.__onOrderBookUpdate)

def __onOrderBookUpdate(self, orderBookUpdate):
bid = orderBookUpdate.getBidPrices()[0]
ask = orderBookUpdate.getAskPrices()[0]
book = OrderBook.from_snapshot(orderBookUpdate)
bid = book.inside_bid().price
ask = book.inside_ask().price

if bid != self.bid or ask != self.ask:
self.bid = bid
Expand Down

0 comments on commit 51c307f

Please sign in to comment.