Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
binance web-sockets: Add GetOrderBook() (#718) (part of #715)
Browse files Browse the repository at this point in the history
* add GetOrderBook

* add more comments

* move comment on next line

* patch/resolve comments

* patch/ fixes

* patch/ increase waitTime

* add timeout instead of waiting

* patch/ remove timeout

* patch/ check for len > fetchSize
  • Loading branch information
tibrn authored Jul 18, 2021
1 parent 9dcdff5 commit 10994b1
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 20 deletions.
204 changes: 184 additions & 20 deletions plugins/binanceExchange_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@ import (
"time"

"github.com/adshao/go-binance/v2"
"github.com/adshao/go-binance/v2/common"
"github.com/stellar/kelp/api"
"github.com/stellar/kelp/model"
)

const (
STREAM_TICKER_FMT = "%s@ticker"
STREAM_BOOK_FMT = "%s@depth"
TTLTIME = time.Second * 3 // ttl time in seconds
)

var (
timeToWaitForFirstEvent = time.Second
timeWaitForFirstEvent = time.Second * 2
)

var (
ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"}
ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"}
ErrConversionWsPartialDepthEvent = errConversion{from: "interface", to: "*binance.WsPartialDepthEvent"}
)

type Subscriber func(symbol string, state *mapEvents) (*stream, error)
type errMissingSymbol struct {
symbol string
}
Expand Down Expand Up @@ -135,17 +139,21 @@ func makeMapEvents() *mapEvents {
//struct used to keep all cached data
type events struct {
SymbolStats *mapEvents
BookStats *mapEvents
}

func createStateEvents() *events {
events := &events{
SymbolStats: makeMapEvents(),
BookStats: makeMapEvents(),
}

return events
}

// subscribe for symbol@ticker
// 24hr rolling window ticker statistics for a single symbol. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs.
// Stream Name: <symbol>@ticker
// Update Speed: 1000ms
func subcribeTicker(symbol string, state *mapEvents) (*stream, error) {

wsMarketStatHandler := func(ticker *binance.WsMarketStatEvent) {
Expand All @@ -162,6 +170,50 @@ func subcribeTicker(symbol string, state *mapEvents) (*stream, error) {
return nil, err
}

keepConnection(doneC, func() {
subcribeTicker(symbol, state)
})

return &stream{doneC: doneC, stopC: stopC, cleanup: func() {
state.Del(symbol)
}}, err

}

//restart Connection with ws
// Binance close each connection after 24 hours
func keepConnection(doneC chan struct{}, reconnect func()) {

go func() {
<-doneC
reconnect()
}()
}

// Top <levels> bids and asks, pushed every second. Valid <levels> are 5, 10, or 20.
// <symbol>@depth<levels>@100ms
// 100ms
func subcribeBook(symbol string, state *mapEvents) (*stream, error) {

wsPartialDepthHandler := func(event *binance.WsPartialDepthEvent) {
state.Set(symbol, event)
}

errHandler := func(err error) {
log.Printf("Error WsPartialDepthServe for symbol %s: %v\n", symbol, err)
}

//Subscribe to highest level
doneC, stopC, err := binance.WsPartialDepthServe100Ms(symbol, "20", wsPartialDepthHandler, errHandler)

if err != nil {
return nil, err
}

keepConnection(doneC, func() {
subcribeBook(symbol, state)
})

return &stream{doneC: doneC, stopC: stopC, cleanup: func() {
state.Del(symbol)
}}, err
Expand Down Expand Up @@ -209,6 +261,35 @@ func getPrecision(floatStr string) int8 {
return int8(len(strs[1]))
}

//subscribeStream and wait for the first event
func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe Subscriber, state *mapEvents) (mapData, error) {

stream, err := subscribe(symbol, state)

streamName := fmt.Sprintf(format, symbol)

if err != nil {
return mapData{}, fmt.Errorf("error when subscribing for %s: %s", streamName, err)
}

//Store stream
beWs.streamLock.Lock()
beWs.streams[streamName] = stream
beWs.streamLock.Unlock()

//Wait for binance to send events
time.Sleep(timeWaitForFirstEvent)

data, isStream := state.Get(symbol)

//We couldn't subscribe for this pair
if !isStream {
return mapData{}, fmt.Errorf("error while subscribing for %s", streamName)
}

return data, nil
}

// GetTickerPrice impl.
func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[model.TradingPair]api.Ticker, error) {

Expand All @@ -224,25 +305,10 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo
tickerData, isTicker := beWs.events.SymbolStats.Get(symbol)

if !isTicker {
stream, err := subcribeTicker(symbol, beWs.events.SymbolStats)
tickerData, err = beWs.subscribeStream(symbol, STREAM_TICKER_FMT, subcribeTicker, beWs.events.SymbolStats)

if err != nil {
return nil, fmt.Errorf("error when subscribing for %s: %s", symbol, err)
}

//Store stream
beWs.streamLock.Lock()
beWs.streams[fmt.Sprintf(STREAM_TICKER_FMT, symbol)] = stream
beWs.streamLock.Unlock()

//Wait for binance to send events
time.Sleep(timeToWaitForFirstEvent)

tickerData, isTicker = beWs.events.SymbolStats.Get(symbol)

//We couldn't subscribe for this pair
if !isTicker {
return nil, fmt.Errorf("error while fetching ticker price for trading pair %s", symbol)
return nil, err
}

}
Expand Down Expand Up @@ -286,6 +352,104 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo
return priceResult, nil
}

//GetOrderBook impl
func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount int32) (*model.OrderBook, error) {

var (
fetchSize = int(maxCount)
)

if fetchSize > 20 {
return nil, fmt.Errorf("Max supported depth level is 20")
}

symbol, err := pair.ToString(beWs.assetConverter, beWs.delimiter)
if err != nil {
return nil, fmt.Errorf("error converting pair to string: %s", err)
}

bookData, isBook := beWs.events.BookStats.Get(symbol)

if !isBook {

bookData, err = beWs.subscribeStream(symbol, STREAM_BOOK_FMT, subcribeBook, beWs.events.BookStats)

if err != nil {
return nil, err
}

}

//Show how old is the orderbook
log.Printf("OrderBook for %s is %d milliseconds old!\n", symbol, time.Now().Sub(bookData.createdAt).Milliseconds())

if isStale(bookData, TTLTIME) {
return nil, fmt.Errorf("ticker for %s symbols is older than %v", symbol, TTLTIME)
}

bookI := bookData.data

//Convert to WsMarketStatEvent
book, isOk := bookI.(*binance.WsPartialDepthEvent)

if !isOk {
return nil, ErrConversionWsPartialDepthEvent
}

askCcxtOrders := book.Asks
bidCcxtOrders := book.Bids

if len(askCcxtOrders) > fetchSize {
askCcxtOrders = askCcxtOrders[:fetchSize]

}

if len(bidCcxtOrders) > fetchSize {
bidCcxtOrders = bidCcxtOrders[:fetchSize]
}

asks, err := beWs.readOrders(askCcxtOrders, pair, model.OrderActionSell)

if err != nil {
return nil, err
}

bids, err := beWs.readOrders(bidCcxtOrders, pair, model.OrderActionBuy)

if err != nil {
return nil, err
}

return model.MakeOrderBook(pair, asks, bids), nil
}

//readOrders... transform orders from binance to model.Order
func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *model.TradingPair, orderAction model.OrderAction) ([]model.Order, error) {

pricePrecision := getPrecision(orders[0].Price)
volumePrecision := getPrecision(orders[0].Quantity)

result := []model.Order{}
for _, o := range orders {

price, quantity, err := o.Parse()

if err != nil {
return nil, err
}

result = append(result, model.Order{
Pair: pair,
OrderAction: orderAction,
OrderType: model.OrderTypeLimit,
Price: model.NumberFromFloat(price, pricePrecision),
Volume: model.NumberFromFloat(quantity, volumePrecision),
Timestamp: nil,
})
}
return result, nil
}

//Unsubscribe ... unsubscribe from binance streams
func (beWs *binanceExchangeWs) Unsubscribe(stream string) {

Expand Down
41 changes: 41 additions & 0 deletions plugins/binanceExchange_ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,44 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) {
return
}
}

func Test_binanceExchangeWs_GetOrderBook(t *testing.T) {

testBinanceExchangeWs, e := makeBinanceWs()
if !assert.NoError(t, e) {
return
}

for _, obDepth := range []int32{1, 5, 8, 10, 15, 16, 20} {

pair := model.TradingPair{Base: model.XLM, Quote: model.BTC}
ob, e := testBinanceExchangeWs.GetOrderBook(&pair, obDepth)
if !assert.NoError(t, e) {
return
}
assert.Equal(t, ob.Pair(), &pair)

if !assert.True(t, len(ob.Asks()) > 0, len(ob.Asks())) {
return
}
if !assert.True(t, len(ob.Bids()) > 0, len(ob.Bids())) {
return
}

if !assert.True(t, len(ob.Asks()) <= int(obDepth), fmt.Sprintf("asks should be <= %d", obDepth)) {
return
}
if !assert.True(t, len(ob.Bids()) <= int(obDepth), fmt.Sprintf("bids should be <= %d", obDepth)) {
return
}

assert.True(t, ob.Asks()[0].OrderAction.IsSell())
assert.True(t, ob.Asks()[0].OrderType.IsLimit())
assert.True(t, ob.Bids()[0].OrderAction.IsBuy())
assert.True(t, ob.Bids()[0].OrderType.IsLimit())
assert.True(t, ob.Asks()[0].Price.AsFloat() > 0)
assert.True(t, ob.Asks()[0].Volume.AsFloat() > 0)
assert.True(t, ob.Bids()[0].Price.AsFloat() > 0)
assert.True(t, ob.Bids()[0].Volume.AsFloat() > 0)
}
}

0 comments on commit 10994b1

Please sign in to comment.