This repository has been archived indefinitely.
If you're looking for something similar that is barebones and gets the grunt work out of the way, then you can look at my ledger-api
repository. I will be working on, and maintaining, that project instead.
The ledger-api
repository is a minimalist backend REST API for my main ledger
repository. It currently supports Coinbase Pro
and Kraken
as of this final commit.
Future plans are to support Coinbase
, Gemini
, 1inch
, Uniswap
, Coinbase Wallet
, Metamask Wallet
, anything Web3
related, and more.
Thank you for the positive feedback. The community has been helpful and welcoming. I apologize for any inconvienence this may have caused.
Best Regards.
A Python 3 Client Wrapper for the Coinbase Pro Rest API
- Requires Python 3.6 or greater
- Provided under MIT License by Daniel Paquin.
- A simple to use python wrapper for both public and authenticated endpoints.
- In about 10 minutes, you could be programmatically trading on one of the largest Bitcoin exchanges in the world!
- Do not worry about handling the nuances of the API with easy-to-use methods for every API endpoint.
- Gain an advantage in the market by getting under the hood of CB Pro to learn what and who is behind every tick.
- Private Client and Websocket Tests
- Web Socket Client
- Real-Time Order Book
- FIX API Client Looking for assistance
- This library is a fork of the original.
- This library may be subtly broken or buggy.
- If you are here looking for the original GDAX project, then you can find it here.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- Getting Started
- Install
- Core API
- Public API
- Private API
- Websocket Feed
- Real-Time Order Book
- Testing
- Change Log
This README.md
is documentation on the syntax of the python client presented in
this repository.
See both Requests and Coinbase Pro API Documentation for full details.
WARNING: It's recommended that you use the websocket
interface instead of polling the general interface methods. It's best to reference the Coinbase Pro API Docs to see where polling is allowed in some niche cases. Polling can result in blocking, or even banning, access to the API in most other cases.
NOTE: This library conflicts with the original coinbasepro-python
package. Make sure to remove it before installing this package to avoid package conflicts.
- Install manually
git clone https://github.com/teleprint-me/coinbasepro-python.git
cd coinbasepro-python
virtualenv venv
source venv/bin/activate
python setup.py install
- Install with
pip
mkdir my_client_project
cd my_client_project
virtualenv venv
source venv/bin/activate
pip install git+git://github.com/teleprint-me/coinbasepro-python.git
cbpro.auth.Auth(key: str, secret: str, passphrase: str)
Use the Auth
object to authenticate yourself with private endpoints. The Auth
object is a callable object that is passed to a requests
method.
Example:
import cbpro
import requests
key = 'My Key'
secret = 'My Secret'
passphrase = 'My Passphrase'
sandbox = 'https://api-public.sandbox.pro.coinbase.com'
endpoint = '/accounts'
url = sandbox + endpoint
params = None
auth = cbpro.Auth(key, secret, passphrase)
response = requests.get(
url,
params=params,
auth=auth,
timeout=30
)
json = response.json()
print(json)
cbpro.messenger.Messenger(auth: cbpro.auth.Auth = None,
url: str = None,
timeout: int = None)
The Messenger
object is a requests
wrapper. It handles most of the common repeated tasks for you.
The Messenger
object methods will return a dict
in most cases. Methods may also return a list
. The Messenger.paginate
method returns a generator
.
The Messenger
object defaults to using the Rest API URL. It is recommended to use the Sandbox Rest API URL instead for testing.
Example:
import cbpro
key = 'My Key'
secret = 'My Secret'
passphrase = 'My Passphrase'
auth = cbpro.Auth(key, secret, passphrase)
sandbox = 'https://api-public.sandbox.pro.coinbase.com'
endpoint = '/accounts'
messenger = cbpro.Messenger(auth=auth, url=sandbox)
accounts = messenger.get(endpoint)
print(accounts)
messenger.paginate(endpoint: str, params: dict = None) -> object
The Messenger
interface provides a method for paginated endpoints. Multiple calls must be made to receive the full set of data.
Messenger.paginate
returns a generator
which provides a clean interface for iteration and may make multiple requests behind the scenes.
The pagination options before
, after
, and limit
may be supplied as keyword arguments if desired and are not necessary for typical use cases.
Example:
import cbpro
key = 'My Key'
secret = 'My Secret'
passphrase = 'My Passphrase'
sandbox = 'https://api-public.sandbox.pro.coinbase.com'
endpoint = '/fills'
auth = cbpro.Auth(key, secret, passphrase)
messenger = cbpro.Messenger(auth=auth, url=sandbox)
params = {'product_id': 'BTC-USD', 'before': 2}
fills = messenger.paginate(endpoint, params)
for fill in fills:
print(fill)
A Messenger
instance is passed to the PublicClient
or PrivateClient
objects and then shared among the related classes during instantiation of client
related objects. Each instance has its own memory and shares a reference to the same Messenger
instance.
Any objects that subscribe to the Subscriber
interface inherit the initialization of the Subscriber.messenger
property allowing them to effectively communicate via the same Auth
and requests.Session
objects without duplicating code.
# ...
class Time(cbpro.messenger.Subscriber):
def get(self) -> dict:
return self.messenger.get('/time')
class PublicClient(object):
def __init__(self, messenger: cbpro.messenger.Messenger) -> None:
self.products = Products(messenger)
self.currencies = Currencies(messenger)
self.time = Time(messenger)
# ...
cbpro.public.PublicClient(messenger: cbpro.messenger.Messenger)
Only some endpoints in the API are available to everyone. The public endpoints
can be reached using cbpro.public.PublicClient
object.
Example 1:
import cbpro
messenger = cbpro.Messenger()
public = cbpro.PublicClient(messenger)
cbpro.public.public_client(url: str = None)
You can use the public_client
method in most cases to simplify instantiation. Example 2
is only one line of code where Example 1
is 2 lines of code. They both return an instantiated PublicClient
object either way.
Example 2:
import cbpro
public = cbpro.public_client()
cbpro.public.Products(messenger: cbpro.messenger.Messenger)
public.products.list() -> list
Example:
products = public.products.list()
type(products) # <class 'list'>
len(products) # 193
public.products.get(product_id: str) -> dict
Example:
product_id = 'BTC-USD'
product = public.products.get(product_id)
type(product) # <class 'dict'>
print(product)
# NOTE:
# - Polling is discouraged for this method
# - Use the websocket stream for polling instead
public.products.order_book(product_id: str, params: dict) -> dict
Example:
params = {'level': 1}
book = public.products.order_book(product_id, params)
type(book) # <class 'dict'>
print(book)
# NOTE:
# - Polling is discouraged for this method
# - Use the websocket stream for polling instead
public.product.ticker(product_id: str) -> dict
# NOTE:
# - This request is paginated
public.products.trades(product_id: str, params: dict = None) -> object
Example:
params = {'before': 2}
trades = public.products.trades(product_id, params)
type(trades) # <class 'generator'>
print(trades) # <generator object Messenger.paginate at 0x7f9b0ac24cf0>
for index, trade in enumerate(trades):
if index == 10:
break
print(trade)
# NOTE:
# - Polling is discouraged for this method
# - Use the websocket stream for polling instead
public.products.history(product_id: str, params: dict) -> list
Example:
import datetime
start = datetime.datetime(2021, 4, 1, 0, 0, 0, 0).isoformat()
end = datetime.datetime(2021, 4, 7, 0, 0, 0, 0).isoformat()
day = 86400
params = {'start': start, 'end': end, 'granularity': day}
history = public.products.history(product_id, params)
type(history) # <class 'list'>
len(history) # 7
print(history)
This interface allows querying of more than 300 candles by sending multiple requests if the requested time range is too large.
The Coinbase Pro API - Get Historic Rates section describes it as follows:
The maximum number of data points for a single request is 300 candles. If your selection of start/end time and granularity will result in more than 300 data points, your request will be rejected. If you wish to retrieve fine granularity data over a larger time range, you will need to make multiple requests with new start/end ranges.
# NOTE:
# - Polling is discouraged for this method
# - Use the websocket stream for polling instead
public.history.candles(product_id: str, params: dict) -> list
Example:
import datetime
import cbpro
n_days = 301
granularity = 86400 # daily
end = datetime.datetime.now()
start = end - datetime.timedelta(days=n_days)
params = {'start': start.isoformat(), 'end': end.isoformat(), 'granularity': granularity}
messenger = cbpro.Messenger()
public = cbpro.PublicClient(messenger)
product_id = "BTC-USD"
history = public.history.candles(product_id, params)
type(history) # <class 'list'>
len(history) # 301
print(history)
public.products.stats(product_id: str) -> dict
cbpro.public.Currencies(messenger: cbpro.messenger.Messenger)
public.currencies.list() -> list
public.currencies.get(product_id: str) -> dict
cbpro.public.Time(messenger: cbpro.messenger.Messenger)
public.time.get() -> dict
cbpro.models.PublicModel()
Use PublicModel
to generate passable parameters easily.
Models will help enforce your code according to what the API expects. If a parameter is incorrect, or forgotten, then the model will raise an AssertionError
.
This helps in seperating the application logic from the client interface leaving the client
objects clean and tidy.
Example:
import cbpro
model = cbpro.PublicModel()
cbpro.models.ProductsModel()
model.products.order_book(level: int = None) -> dict
Example 1:
try: # intentionally trigger
params = model.products.order_book(5)
except (AssertionError,) as e:
print('AssertionError:', e)
# AssertionError: `level` must be one of: [1, 2, 3]
Example 2:
params = model.products.order_book(2)
print(params) # {'level': 2}
book = public.products.order_book(product_id, params=params)
type(book) # <class 'dict'>
print(book)
model.products.history(start: str = None,
end: str = None,
granularity: int = 86400) -> dict
Example:
import datetime
start = datetime.datetime(2021, 4, 1, 0, 0, 0, 0).isoformat()
end = datetime.datetime(2021, 4, 7, 0, 0, 0, 0).isoformat()
params = model.products.history(start, end)
print(params)
# {'start': '2021-04-01T00:00:00', 'end': '2021-04-07T00:00:00', 'granularity': 86400}
history = public.products.history(product_id, params)
type(history) # <class 'list'>
len(history) # 7
print(history)
cbpro.private.PrivateClient(messenger: cbpro.messenger.Messenger)
Not all API endpoints are available to everyone.
Those requiring user authentication can be reached using the PrivateClient
object. You must setup API access within your
Account Settings.
NOTE: The PrivateClient
object inherits from the PublicClient
object.
Example 1:
import cbpro
key = 'My Key'
secret = 'My Secret'
passphrase = 'My Passphrase'
sandbox = 'https://api-public.sandbox.pro.coinbase.com'
auth = cbpro.Auth(key, secret, passphrase)
messenger = cbpro.Messenger(auth=auth, url=sandbox)
private = cbpro.PrivateClient(messenger)
cbpro.private.private_client(key: str,
secret: str,
passphrase: str,
url: str = None) -> PrivateClient:
You can use the private_client
method in most cases to simplify instantiation. Example 2
is only one line of code where Example 1
is 3 lines of code. They both return an instantiated PrivateClient
object either way.
Example 2:
import cbpro
key = 'My Key'
secret = 'My Secret'
passphrase = 'My Passphrase'
sandbox = 'https://api-public.sandbox.pro.coinbase.com'
private = cbpro.private_client(key, secret, passphrase, sandbox)
cbpro.private.Accounts(messenger: cbpro.messenger.Messenger)
private.accounts.list() -> list
private.accounts.get(account_id: str) -> dict
# NOTE:
# - This request is paginated
private.accounts.history(account_id: str, params: dict = None) -> list
# NOTE:
# - This request is paginated
private.accounts.holds(account_id: str, params: dict = None) -> list
cbpro.private.Orders(messenger: cbpro.messenger.Messenger)
private.orders.post(json: dict) -> dict
Example: Limit Order
limit = {
'side': 'buy',
'product_id': 'BTC-USD',
'type': 'limit',
'price': 57336.2,
'size': 0.001
}
private.orders.post(limit)
Example: Market Order
market = {
'side': 'buy',
'product_id': 'BTC-USD',
'type': 'market',
'funds': 100.0
}
private.orders.post(market)
Example: Limit Stop Order
stop = {
'side': 'buy',
'product_id': 'BTC-USD',
'type': 'limit',
'stop': 'loss',
'stop_price': 50000.0,
'price': 57064.8,
'size': 0.001
}
private.orders.post(stop)
Example: Market Stop Order
stop = {
'side': 'buy',
'product_id': 'BTC-USD',
'type': 'market',
'stop': 'loss',
'stop_price': 45000.0,
'funds': 100.0
}
private.orders.post(stop)
private.orders.cancel(id_: str, params: dict = None) -> list
private.orders.cancel_client(oid: str, params: dict = None) -> list
private.orders.cancel_all(params: dict = None) -> list
# NOTE:
# - This request is paginated
private.orders.list(params: dict) -> list
private.orders.get(id_: str) -> dict
private.orders.get_client(oid: str) -> dict
cbpro.private.Fills(messenger: cbpro.messenger.Messenger)
# NOTE:
# - This request is paginated
# - You are required to provide either a `product_id` or `order_id`
private.fills.list(params: dict) -> list
Example 1:
product_id = {'product_id': 'BTC-USD'}
private.fills.list(product_id)
Example 2:
order_id = {'order_id': '0e953c31-9bce-4007-978c-302be337b566'}
private.fills.list(order_id)
cbpro.private.Limits(messenger: cbpro.messenger.Messenger)
private.limits.get() -> dict
cbpro.private.Deposits(messenger: cbpro.messenger.Messenger)
# NOTE:
# - This request is paginated
private.deposits.list(params: dict = None) -> list
private.deposits.get(transfer_id: str) -> dict
private.deposits.payment(json: dict) -> dict
private.deposits.coinbase(json: dict) -> dict
private.deposits.generate(account_id: str) -> dict
# NOTE:
# - `cbpro.private.Withdrawls` inherits from `cbpro.private.Deposits`
cbpro.private.Withdrawals(messenger: cbpro.messenger.Messenger)
# NOTE:
# - This method is overridden
private.withdrawals.payment(json: dict) -> dict
# NOTE:
# - This method is overridden
private.withdrawals.coinbase(json: dict) -> dict
private.withdrawals.crypto(json: dict) -> dict
private.withdrawals.estimate(params: dict) -> dict
cbpro.private.Conversions(messenger: cbpro.messenger.Messenger)
private.conversions.post(json: dict) -> dict
cbpro.private.Payments(messenger: cbpro.messenger.Messenger)
private.payments.list() -> list
cbpro.private.Coinbase(messenger: cbpro.messenger.Messenger)
private.coinbase.list() -> list
cbpro.private.Fees(messenger: cbpro.messenger.Messenger)
private.fees.list() -> list
# NOTE:
# - This object is currently undefined
cbpro.private.Reports(messenger: cbpro.messenger.Messenger)
cbpro.private.Profiles(messenger: cbpro.messenger.Messenger)
private.profiles.list(params: dict = None) -> list
private.profiles.get(profile_id: str) -> dict
private.profiles.transfer(json: dict) -> dict
# NOTE:
# - This object is currently undefined
cbpro.private.Oracle(messenger: cbpro.messenger.Messenger)
# NOTE:
# - `cbpro.models.PrivateModel` inherits from `cbpro.models.PublicModel`
cbpro.models.PrivateModel()
Use PrivateModel
to generate passable parameters easily.
Example:
import cbpro
model = cbpro.PrivateModel()
cbpro.models.OrdersModel()
# NOTE:
# - These params are common to all orders
# - You should prefer using the `market` and `limit` methods instead
# - the `side` argument must be of type `str`: 'buy' or 'sell'
# - the `product_id` argument must be of type `str`: i.e. 'BTC-USD'
# - the `type_` argument must be of type `str`: 'limit' or 'market'
model.orders.base(side: str,
product_id: str,
type_: str = None,
client_oid: str = None,
stp: str = None,
stop: str = None,
stop_price: float = None) -> dict
# NOTE:
# - Limit orders are synonymous with Stop orders
# - If you want to place a Stop order, then you can
# use either the `limit` or `market` methods by
# setting the `stop` and `stop_price` arguments
# respectively.
model.orders.limit(side: str,
product_id: str,
price: float,
size: float,
time_in_force: str = None,
cancel_after: str = None,
post_only: bool = None,
client_oid: str = None,
stp: str = None,
stop: str = None,
stop_price: float = None) -> dict
Example:
request = model.orders.limit('buy', 'BTC-USD', 48662.26, 0.001)
print(request)
response = private.orders.post(request)
print(response)
# NOTE:
# - This method requires either a `size` or `funds` argument
model.orders.market(side: str,
product_id: str,
size: float = None,
funds: float = None,
client_oid: str = None,
stp: str = None,
stop: str = None,
stop_price: float = None) -> dict
Example:
request = model.orders.market('buy', 'BTC-USD', funds=100.0)
print(request)
response = private.orders.post(request)
print(response)
model.orders.cancel(product_id: str = None) -> dict
Example:
request = model.orders.cancel('LINK-USD')
# request
# {'product_id': 'LINK-USD'}
response = private.orders.cancel(
'eb183792-95df-4b02-987b-b7e91940bed9', request
)
# response
# 'eb183792-95df-4b02-987b-b7e91940bed9'
# NOTE:
# - `status` must be one of:
# 'open', 'pending', 'active', 'done', or 'all'
model.orders.list(status: str, product_id: str = None) -> dict
Example:
request = model.orders.list('open')
response = private.orders.list(request)
for order in response:
print(order)
cbpro.models.FillsModel()
# NOTE:
# - You are required to provide either a `product_id` or `order_id`
model.fills.list(product_id: str = None, order_id: str = None) -> dict
cbpro.models.DepositsModel()
# NOTE:
# - These parameters are all optional
# - If `type_` is set, then it must be of type str
# where `type` is 'deposit' or 'internal_deposit'
# - If `limit` is set, then it must be of type int
# where `limit` is 0 < limit <= 100
model.deposits.list(type_: str = None,
profile_id: str = None,
before: str = None,
after: str = None,
limit: int = None) -> dict
# NOTE:
# - `payment_id` represents the `payment_method_id` parameter
model.deposits.payment(amount: float,
currency: str,
payment_id: str) -> dict
# NOTE:
# - `coinbase_id` represents the `coinbase_account_id` parameter
model.deposits.payment(amount: float,
currency: str,
coinbase_id: str) -> dict
# NOTE:
# - `cbpro.models.WithdrawalsModel` inherits from
# `cbpro.models.DepositsModel`
cbpro.models.WithdrawalsModel()
# NOTE:
# - `address` represents the `crypto_address` parameter
model.withdrawals.crypto(amount: float,
currency: str,
address: str) -> dict
# NOTE:
# - `address` represents the `crypto_address` parameter
model.withdrawals.estimate(currency: str, address: str) -> dict
cbpro.models.ConversionsModel()
model.conversions.post(from_: str,
to: str,
amount: float) -> dict
cbpro.models.ProfilesModel()
model.profiles.list(active: bool) -> dict
model.profiles.transfer(from_: str,
to: str,
currency: str,
amount: float) -> dict
cbpro.websocket.get_message(value: dict = none) -> dict
If you would like to receive real-time market updates, you must subscribe to the Websocket Feed.
Example:
import cbpro
message = cbpro.get_message()
# message
# {
# 'type': 'subscribe',
# 'product_ids': ['BTC-USD'],
# 'channels': ['ticker']
# }
# NOTE:
# - This class is callable
# - Authentication is optional
# - Authentication will result in a couple of benefits
# - Messages where you're one of the parties are expanded
# and have more useful fields
# - You will receive private messages, such as lifecycle
# information about stop orders you placed
cbpro.websocket.Header(key: str,
secret: str,
passphrase: str)
You can also authenticate yourself with the Websocket Feed.
Example:
import cbpro
key = 'My Key'
secret = 'My Secret'
passphrase = 'My Passphrase'
header = cbpro.WebsocketHeader(key, secret, passphrase)
auth = header()
print(auth)
cbpro.websocket.WebsocketStream(header: WebsocketHeader = None,
timeout: int = None,
traceable: bool = False)
Subscribe to a single product
import cbpro
message = cbpro.get_message({
'type': 'subscribe',
'product_ids': ['BTC-USD'],
'channels': ['ticker']
})
stream = cbpro.WebsocketStream()
stream.connect()
stream.send(message)
# Do other stuff...
stream.disconnect()
Subscribe to multiple products
import cbpro
message = cbpro.get_message({
'type': 'subscribe',
'product_ids': ['BTC-USD', 'ETH-USD'],
'channels': ['ticker']
})
stream = cbpro.WebsocketStream()
stream.connect()
stream.send(message)
# Do other stuff...
stream.disconnect()
Example:
import cbpro
key = 'My Key'
secret = 'My Secret'
passphrase = 'My Passphrase'
message = cbpro.get_message()
header = cbpro.WebsocketHeader(key, secret, passphrase)
stream = cbpro.WebsocketStream(header=header, traceable=True)
print(stream.connected)
stream.connect()
stream.send(message)
print(stream.connected)
response = stream.receive()
print(response)
response = stream.receive()
print(response)
stream.disconnect()
# NOTE:
# - This is a read-only property
stream.connected -> bool
stream.connect() -> None
stream.send(params: dict) -> None
stream.receive() -> dict
# NOTE:
# - This method blocks
# - This method sends a keepalive request
# - Ping the connection based on the timeout
stream.ping() -> None
stream.disconnect() -> None
# NOTE:
# - This class and its behavior is subject to change
# - Other methods are being investigated
# - We can use threading.Event (optional)
# - We can use signal (optional)
cbpro.websocket.WebsocketEvent()
The purpose of the WebsocketEvent
class is mainly to serve as a template for user defined methods. The current object is for illustration purposes only.
The WebsocketEvent
object can be used to define executable methods while the WebsocketClient
is manipulating a threading.Thread
.
You can inherit from this class and override any of the given methods to implement any desired behavior during a specific point in execution.
The WebsocketEvent.on_listen
method is most likely the one you'll care most for because that is executed during the threading.Thread
lifespan.
import cbpro
event = cbpro.WebsocketEvent()
event.on_error(*args, **kwargs) -> object
- called once immediately after the exception is raised
event.on_start(*args, **kwargs) -> object
- called once immediately before the socket connection is made, this is where you want to add initial parameters.
event.on_run(*args, **kwargs) -> object
- called once immediately before the thread is made, this is where you want to add initial parameters.
event.on_stop(*args, **kwargs) -> object
- called once immediately before the websocket is closed.
event.on_listen(*args, **kwargs) -> object
- called once for every message received if the message resolves to
True
# NOTE:
# - This class and its behavior is subject to change
# - Other methods are being investigated
# - We can use threading.Event (optional)
# - We can use signal (optional)
cbpro.websocket.WebsocketClient(stream: WebsocketStream,
event: WebsocketEvent = None,
collection: pymongo.collection.Collection = None)
The WebsocketClient
subscribes in a separate thread upon initialization.
There are three methods which you could overwrite before initialization so it
can react to the data streaming in. The current client is a template used for
illustration purposes only.
import time
import cbpro
class CustomEvent(cbpro.WebsocketEvent):
def __init__(self):
# initialize the inherited methods
super(CustomEvent, self).__init__()
# this method is overridden even though it reimplements the
# default behavior
def on_run(self):
print(f'[Run] {threading.active_count()} active threads')
time.sleep(1)
message = cbpro.get_message()
stream = cbpro.WebsocketStream()
event = CustomEvent()
client = cbpro.WebsocketClient(stream, event=event)
client.run(message)
time.sleep(15)
client.stop()
The WebsocketClient
supports data gathering via pymongo
. Given a pymongo.collection.Collection
, the WebsocketClient
will stream results directly into the database collection.
import time
import pymongo
import cbpro
# connect to a local, running, Mongo instance
mongo = pymongo.MongoClient('mongodb://localhost:27017/')
# specify the database and collection
database = mongo.cryptocurrency_database
bitcoin_collection = database.bitcoin_collection
# instantiate a WebsocketClient instance with Mongo collection as parameter
message = cbpro.get_message()
stream = cbpro.WebsocketStream()
client = cbpro.WebsocketClient(stream, collection=bitcoin_collection)
client.run(message)
time.sleep(15)
client.stop()
client.listen(*args, **kwargs) -> object
- listen to received stream messages
- this is called once at the end of
client.start
after the stream is connected
client.start(*args, **kwargs) -> object
- starts the
stream
and callslisten
- this is where you want to add initial parameters
client.run(*args, **kwargs) -> object
- create and start the
thread
- this is where you want to add initial parameters
client.stop(*args, **kwargs) -> object
- disconnect from the
stream
and join thethread
cbpro.websocket.websocket_client(key: str = None,
secret: str = None,
passphrase: str = None,
event: WebsocketEvent = None,
collection: pymongo.collection.Collection = None,
traceable: bool = False) -> WebsocketClient
A test suite is under development. Tests for the authenticated client require a
set of sandbox API credentials. To provide them, rename
api_config.json.example
in the tests folder to api_config.json
and edit the
file accordingly. To run the tests, start in the project directory and run
python -m pytest
The OrderBook
subscribes to a websocket and keeps a real-time record of
the orderbook for the product_id input. Please provide your feedback for future
improvements.
import cbpro, time
order_book = cbpro.OrderBook(product_id='BTC-USD')
order_book.start()
time.sleep(10)
order_book.close()
Unit tests are under development using the pytest framework. Contributions are welcome!
To run the full test suite, in the project directory run:
python -m pytest