Create A Cryptocurrency Trading Bot in Elixir
Create A Cryptocurrency Trading Bot in Elixir
Kamil Skowron
This book is for sale at [Link]
This is a Leanpub book. Leanpub empowers authors and publishers with the Lean Publishing
process. Lean Publishing is the act of publishing an in-progress ebook using lightweight tools and
many iterations to get reader feedback, pivot until you have the right book and build traction once
you do.
Preface . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1
Source code . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 7
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 18
Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 18
Initializiation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 18
How trading strategy will work? . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 22
Chapter 14 - Store trade events and orders inside the database . . . . . . . . . . . . . . . . . . 169
Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 169
Overview of requirements . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 169
Create a new data_warehouse application in the umbrella . . . . . . . . . . . . . . . . . . . . 170
Connect to the database using Ecto . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 170
Store trade events’ data . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 172
Store orders’ data . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 176
Implement supervision . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 183
1 cd hedgehog/apps
2 mix new streamer --sup
1 # /apps/streamer/[Link]
2 defp deps do
3 [
4 {:websockex, "~> 0.4.2"}
5 ]
6 end
1 # WebSockex's readme
2 defmodule WebSocketExample do
3 use WebSockex
4
5 def start_link(url, state) do
6 WebSockex.start_link(url, __MODULE__, state)
7 end
8
9 def handle_frame({type, msg}, state) do
10 [Link] "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
11 {:ok, state}
12 end
13
14 def handle_cast({:send, {type, msg} = frame}, state) do
15 [Link] "Sending #{type} frame with payload: #{msg}"
16 {:reply, frame, state}
17 end
18 end
We will copy the whole code above across to our new [Link] file.
The first step will be to update the module name to match our file name:
1 # /apps/streamer/lib/streamer/[Link]
2 defmodule [Link] do
In the spirit of keeping things tidy - we will now remove the handle_cast/2 function (last function
in our module) as we won’t be sending any messages back to Binance via WebSocket (to place orders
etc - Binance provides a REST API which we will use in the next chapter).
⁴[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 10
Next, let’s look up what URL should we use to connect to Binance’s API. Binance has a separate
WSS (Web Socket Streams) documentation at Github⁵
Scrolling down we can see the General WSS information section where 3 important pieces of
information are listed:
We can see that the full endpoint for raw streams(we will be using a “raw” stream) will be
[Link] with stream name at the end (together with lowercased
symbol).
Note: In context of Binance API, “raw” means that no aggregation was performed before broadcast-
ing the data on WebSocket.
Let’s introduce a module attribute that will hold the full raw stream endpoint which will be used
across the module:
1 # /apps/streamer/lib/streamer/[Link]
2 @stream_endpoint "[Link]
Now back in Binance’s WSS documentation⁶ we need to search for “Trade Streams”. “trade” in
context of this documentation means an exchange of assets(coins/tokens) by two sides (buyer and
seller). Our future trading strategy will be interested in the “latest price” which is simply the last
trade event’s price.
We can see that docs are pointing to the following stream name:
⁵[Link]
⁶[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 11
1 # /apps/streamer/lib/streamer/[Link]
2 def start_link(symbol) do
3 symbol = [Link](symbol)
4
5 WebSockex.start_link(
6 "#{@stream_endpoint}#{symbol}@trade",
7 __MODULE__,
8 nil
9 )
10 end
Instead of passing an URL, we modified the function to accept a symbol, downcase it and use it
together with the module’s @stream_endpoint attribute to build a full URL.
At this moment streaming of trade events already works which we can test using iex:
1 $ iex -S mix
2 ...
3 iex(1)> [Link].start_link("xrpusdt")
4 {:ok, #PID<0.335.0>}
5 Received Message - Type: :text -- Message: "{\"e\":\"trade\",\"E\":1603226394741,\"s\
6 \":\"XRPUSDT\",\"t\":74608889,\"p\":\"0.24373000\",\"q\":\"200.00000000\",\"b\":9482\
7 44411,\"a\":948244502,\"T\":1603226394739,\"m\":true,\"M\":true}"
8 Received Message - Type: :text -- Message: "{\"e\":\"trade\",\"E\":1603226394741,\"s\
9 \":\"XRPUSDT\",\"t\":74608890,\"p\":\"0.24372000\",\"q\":\"143.20000000\",\"b\":9482\
10 44412,\"a\":948244502,\"T\":1603226394739,\"m\":true,\"M\":true}"
We can see the messages logged above because we copied the sample implementation from
WebSockex’s readme⁷ where handle_frame/2 function uses [Link]/1 to print out all incoming
data. The lesson here is that every incoming message from Binance will cause the handle_frame/2
callback to be called with the message and the process’ state.
Just for reference, our module should look currently as follows:
⁷[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 12
1 # /apps/streamer/lib/streamer/[Link]
2 defmodule [Link] do
3 use WebSockex
4
5 @stream_endpoint "[Link]
6
7 def start_link(symbol) do
8 symbol = [Link](symbol)
9
10 WebSockex.start_link(
11 "#{@stream_endpoint}#{symbol}@trade",
12 __MODULE__,
13 nil
14 )
15 end
16
17 def handle_frame({type, msg}, state) do
18 [Link] "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
19 {:ok, state}
20 end
21 end
1 # /apps/streamer/[Link]
2 defp deps do
3 [
4 {:jason, "~> 1.2"},
5 {:websockex, "~> 0.4.2"}
6 ]
7 end
As previously, don’t forget to run mix [Link] to fetch the new dependency.
⁸[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 13
Looking through the documentation of the Jason module we can see encode!/2 and decode!/2
functions, both of them have exclamation marks which indicates that they will throw an error
whenever they will be unable to successfully encode or decode the passed value.
This is less than perfect for our use case as we would like to handle those errors in our own
way(technically we could just use try/rescue but as we will find out both encode/2 and decode/2
are available).
We will go a little bit off-topic but I would highly recommend those sorts of journeys around
somebody’s code. Let’s look inside the Jason⁹ module. Scrolling down in search of decode/2 (without
the exclamation mark) we can see it about line 54:
1 # /lib/[Link]
2 def decode(input, opts \\ []) do
3 input = IO.iodata_to_binary(input)
4 [Link](input, format_decode_opts(opts))
5 end
It looks like it uses the parse/2 function of a Decoder module, let’s scroll back up and check where
it’s coming from. At line 6:
1 # /lib/[Link]
2 alias Jason.{Encode, Decoder, DecodeError, EncodeError, Formatter}
we can see that Decoder is an alias of the [Link]¹⁰. Scrolling down to the [Link]
module we will find a parse/2 function about line 43:
1 # /lib/[Link]
2 def parse(data, opts) when is_binary(data) do
3 key_decode = key_decode_function(opts)
4 string_decode = string_decode_function(opts)
5 try do
6 value(data, data, 0, [@terminate], key_decode, string_decode)
7 catch
8 {:position, position} ->
9 {:error, %DecodeError{position: position, data: data}}
10 {:token, token, position} ->
11 {:error, %DecodeError{token: token, position: position, data: data}}
12 else
13 value ->
14 {:ok, value}
15 end
16 end
⁹[Link]
¹⁰[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 14
Based on the result of decoding it will either return {:ok, value} or {:error, %[Link]{...}}
we can confirm that by digging through documentation of the module on hex¹¹.
Once again, the point of this lengthy investigation was to show that Elixir code is readable and
easy to understand so don’t be thrown off when documentation is a little bit light, quite opposite,
contribute to docs and code as you gain an better understanding of the codebase.
We can now get back to our [Link] module and modify the handle_frame/2 function to
decode the incoming JSON message. Based on the result of [Link]/2 we will either call the
process_event/2 function or log an error. Here’s the new version of the handle_frame/2 function:
1 # /apps/streamer/lib/streamer/[Link]
2 def handle_frame({_type, msg}, state) do
3 case [Link](msg) do
4 {:ok, event} -> process_event(event)
5 {:error, _} -> [Link]("Unable to parse msg: #{msg}")
6 end
7
8 {:ok, state}
9 end
Please make note that type is now prefixed with an underscore as we aren’t using it at the moment.
Second important thing to note is that we are using Logger so it needs to be required at the begining
of the module:
1 # /apps/streamer/lib/streamer/[Link]
2 require Logger
Before implementing the process_event/2 function we need to create a structure that will hold the
incoming trade event’s data.
Let’s create a new directory called binance inside the apps/streamer/lib/streamer/ and new file
called trade_event.ex inside it.
Our new module will hold all the trade event’s information but we will also use readable field
names(you will see the incoming data below). We can start by writing a skeleton module code:
1 # /apps/streamer/lib/streamer/binance/trade_event.ex
2 defmodule [Link] do
3 defstruct []
4 end
1 {
2 "e": "trade", // Event type
3 "E": 123456789, // Event time
4 "s": "BNBBTC", // Symbol
5 "t": 12345, // Trade ID
6 "p": "0.001", // Price
7 "q": "100", // Quantity
8 "b": 88, // Buyer order ID
9 "a": 50, // Seller order ID
10 "T": 123456785, // Trade time
11 "m": true, // Is the buyer the market maker?
12 "M": true // Ignore
13 }
Let’s copy them across and convert the comments to update the defstruct inside the
[Link] module’s struct to following:
1 # /apps/streamer/lib/streamer/binance/trade_event.ex
2 defstruct [
3 :event_type,
4 :event_time,
5 :symbol,
6 :trade_id,
7 :price,
8 :quantity,
9 :buyer_order_id,
10 :seller_order_id,
11 :trade_time,
12 :buyer_market_maker
13 ]
That’s all for this struct, we can now get back to implementing the process_event/2 func-
tion inside the [Link] module. We will map every field of the response map to the
%[Link] struct. A useful trick here would be to copy the list of fields once
again from the struct and assign the incoming fields one by one.
Inside the header of the function, we will pattern match on event type(a field called “e” in the
message) to confirm that indeed we received a trade event). In the end, the process_event/2 function
should look as follows:
Chapter 1 - Stream live crypto prices from Binance WSS 16
1 # /apps/streamer/lib/streamer/[Link]
2 defp process_event(%{"e" => "trade"} = event) do
3 trade_event = %[Link]{
4 :event_type => event["e"],
5 :event_time => event["E"],
6 :symbol => event["s"],
7 :trade_id => event["t"],
8 :price => event["p"],
9 :quantity => event["q"],
10 :buyer_order_id => event["b"],
11 :seller_order_id => event["a"],
12 :trade_time => event["T"],
13 :buyer_market_maker => event["m"]
14 }
15
16 [Link](
17 "Trade event received " <>
18 "#{trade_event.symbol}@#{trade_event.price}"
19 )
20 end
We added the [Link]/2 function to be able to see logs of incoming trade events.
Lastly, before testing our implementation, let’s add a nice interface to our streamer application that
allows to start streaming:
1 # /apps/streamer/lib/[Link]
2 defmodule Streamer do
3 @moduledoc """
4 Documentation for `Streamer`.
5 """
6
7 def start_streaming(symbol) do
8 [Link].start_link(symbol)
9 end
10 end
The final version of the [Link] module should look like this¹³.
Last step will be to add the Logger configuration into main config/[Link] file. We will set the
Logger level to :debug for a moment to be able to see incoming trade events:
¹³[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 17
1 # /config/[Link]
2 config :logger,
3 level: :debug
This finishes the implementation part of this chapter, we can now give our implementation a whirl
using iex:
1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("xrpusdt")
4 {:ok, #PID<0.251.0>}
5 [Link].217 [debug] Trade event received [email protected]
6 [Link].381 [debug] Trade event received [email protected]
7 [Link].380 [debug] Trade event received [email protected]
8 [Link].386 [debug] Trade event received [email protected]
As we can see, the streamer is establishing a WebSocket connection with the Binance’s API and it’s
receiving trade events. It decodes them from JSON to %[Link] struct and
logs a compiled message. Also, our interface hides implementation details from the “user” of our
application.
We will now flip the Logger level back to info so the output won’t every incoming trade event:
1 # /config/[Link]
2 config :logger,
3 level: :info
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github¹⁴
¹⁴[Link]
Chapter 2 - Create a naive trading
strategy - a single trader process
without supervision
Objectives
• create another supervised application inside umbrella to store our trading strategy
• define callbacks for events dependent on state of the trader
• push events from the streamer app to the naive app
Initializiation
To develop our naive strategy will need to create a new supervised application inside our umbrella
project:
1 cd apps
2 mix new naive --sup
We can now focus on creating a trader abstraction inside that newly created application. First we
need to create a new file called [Link] inside apps/naive/lib/naive/.
Let’s start with a skeleton of a GenServer:
1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use GenServer
4
5 require Logger
6
7 def start_link(args) do
8 GenServer.start_link(__MODULE__, args, name: :trader)
9 end
10
11 def init(args) do
12 {:ok, args}
13 end
14 end
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 19
Our module uses the GenServer¹⁵ behaviour and to fulfill it’s “contract”, we need to implement the
init/1 function. The start_link/1 function is a convention and it allows us to register the process
with a name(it’s a default function that supervisor will use when starting the Trader). We also add
a require Logger as we will keep on logging across the module.
Next, let’s model the state of our server:
1 # /apps/naive/lib/naive/[Link]
2 defmodule State do
3 @enforce_keys [:symbol, :profit_interval, :tick_size]
4 defstruct [
5 :symbol,
6 :buy_order,
7 :sell_order,
8 :profit_interval,
9 :tick_size
10 ]
11 end
• what symbol does it need to trade (“symbol” here is a pair of assets for example “XRPUSDT”,
which is XRP to/from USDT)
• placed buy order (if any)
• placed sell order (if any)
• profit interval (what net profit % we would like to achieve when buying and selling an asset -
single trade cycle)
• tick_size (yes, I know, jargon warning. We can’t ignore it here as it needs to be fetched from
Binance and it’s used to calculate a valid price. Tick size differs between symbols and it is a
smallest acceptable price movement up or down. For example in physical world tick size for
USD is a single cent, you can’t sell something for $1.234, it’s either $1.23 or $1.24 (one cent
difference between those is the tick size) - more info here¹⁶
Our strategy won’t be able to work without symbol, profit_interval nor tick_size so we added them
to the @enforce_keys attribute. This will ensure that we won’t create an invalid %State{} without
those values.
As now we know that our GenServer will need to receive those details via args, we can update
pattern matching in start_link/1 and init/1 functions to confirm that passed values are indeed
maps:
¹⁵[Link]
¹⁶[Link]
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 20
1 # /apps/naive/lib/naive/[Link]
2 def start_link(%{} = args) do
3 ...
4 end
5
6 def init(%{symbol: symbol, profit_interval: profit_interval}) do
7 ...
8 end
As we are already in the init/1 function we will need to modify it to fetch the tick_size for the
passed symbol and initialize a fresh state:
1 # /apps/naive/lib/naive/[Link]
2 def init(%{symbol: symbol, profit_interval: profit_interval}) do
3 symbol = [Link](symbol)
4
5 [Link]("Initializing new trader for #{symbol}")
6
7 tick_size = fetch_tick_size(symbol)
8
9 {:ok,
10 %State{
11 symbol: symbol,
12 profit_interval: profit_interval,
13 tick_size: tick_size
14 }}
15 end
We are uppercasing the symbol above as Binance’s REST API accepts only uppercased symbols.
It’s the time to connect to Binance’s REST API. The easiest way to do that will be to use the binance¹⁷
module.
As previously, looking through the module’s docs on Github, we can see the Installation
section. We will follow the steps mentioned there, starting from adding binance to the deps in
/apps/naive/[Link]:
¹⁷[Link]
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 21
1 # /apps/naive/[Link]
2 defp deps do
3 [
4 {:binance, "~> 0.7.1"},
5 {:decimal, "~> 2.0"},
6 {:streamer, in_umbrella: true}
7 ]
8 end
Beside adding the :binance module, we also added :decimal and the :streamer. The decimal¹⁸
module will help us to calculate the buy and sell prices (without decimal module we would have
problems with precision). Lastly, we need to include the :streamer application(created in the first
chapter) as we will use the %[Link]{} struct inside the naive application.
We need to run mix [Link] to install our new deps.
We can now get back to the trader module and focus on fetching the tick size from Binance:
1 # /apps/naive/lib/naive/[Link]
2 defp fetch_tick_size(symbol) do
3 Binance.get_exchange_info()
4 |> elem(1)
5 |> [Link](:symbols)
6 |> [Link](&(&1["symbol"] == symbol))
7 |> [Link]("filters")
8 |> [Link](&(&1["filterType"] == "PRICE_FILTER"))
9 |> [Link]("tickSize")
10 |> [Link]()
11 end
We are using get_exchange_info/0 to fetch list of symbols, that we will filter to find the symbol
that we are requested to trade. Tick size is defined as a PRICE_FILTER filter. Here’s the link¹⁹ to the
documentation listing all keys in the result. In a nutshell, that’s how the important parts of the result
looks like:
¹⁸[Link]
¹⁹[Link]
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 22
1 {:ok, %{
2 ...
3 "symbols": [
4 %{
5 "symbol": "ETHBTC",
6 ...
7 "filters": [
8 ...
9 %{"filterType: "PRICE_FILTER", "tickSize": tickSize, ...}
10 ],
11 ...
12 }
13 ]
14 }}
Trader states
Our trader will be receiving trade events sequentially and take decisions
based on it’s own state and trade event’s contents.
We will focus on a trader in 3 different states:
1 # /apps/naive/lib/naive/[Link]
2 alias [Link]
1 # /apps/naive/lib/naive/[Link]
2 def handle_cast(
3 %TradeEvent{price: price},
4 %State{symbol: symbol, buy_order: nil} = state
5 ) do
6 quantity = 100 # <= Hardcoded until chapter 7
7
8 [Link]("Placing BUY order for #{symbol} @ #{price}, quantity: #{quantity}")
9
10 {:ok, %[Link]{} = order} =
11 Binance.order_limit_buy(symbol, quantity, price, "GTC")
12
13 {:noreply, %{state | buy_order: order}}
14 end
For the time being, we will keep the quantity hardcoded as this chapter will
get really long otherwise - don’t worry, we will refactor this in one of the next chapters.
After confirming that we deal with the “new” trader(by pattern matching on the buy_order field
from state), we can safely progress to placing a new buy order. We just need to remember to return
the updated state as otherwise, trader will go on a shopping spree, as every next incoming event will
cause further buy orders(above pattern match will continue to be successful).
1 # /apps/naive/lib/naive/[Link]
2 def handle_cast(
3 %TradeEvent{
4 buyer_order_id: order_id,
5 quantity: quantity
6 },
7 %State{
8 symbol: symbol,
9 buy_order: %[Link]{
10 price: buy_price,
11 order_id: order_id,
12 orig_qty: quantity
13 },
14 profit_interval: profit_interval,
15 tick_size: tick_size
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 25
16 } = state
17 ) do
18 sell_price = calculate_sell_price(buy_price, profit_interval, tick_size)
19
20 [Link](
21 "Buy order filled, placing SELL order for " <>
22 "#{symbol} @ #{sell_price}), quantity: #{quantity}"
23 )
24
25 {:ok, %[Link]{} = order} =
26 Binance.order_limit_sell(symbol, quantity, sell_price, "GTC")
27
28 {:noreply, %{state | sell_order: order}}
29 end
We will implement calculating sell price in a sepearate function based on buy price, profit interval
and tick_size.
Our pattern match will confirm that indeed our buy order got filled(order_id and quantity matches)
so we can now we proceed with placing a sell order using calculated sell price and quantity retrieved
from buy order.
Again, don’t forget to return the updated state as otherwise, trader will keep on placing sell orders
for every incoming event.
To calculate the sell price we will need to use precise math and that will require a custom module.
We will use the Decimal²⁰ module, so first, let’s alias it at the top of the file:
1 # /apps/naive/lib/naive/[Link]
2 alias Decimal, as: D
Now to calculate the correct sell price, we can use the following formula which gets me pretty close
to expected value:
1 # /apps/naive/lib/naive/[Link]
2 defp calculate_sell_price(buy_price, profit_interval, tick_size) do
3 fee = [Link]("1.001")
4 original_price = [Link]([Link](buy_price), fee)
5
6 net_target_price =
7 [Link](
8 original_price,
9 [Link]("1.0", profit_interval)
²⁰[Link]
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 26
10 )
11
12 gross_target_price = [Link](net_target_price, fee)
13
14 D.to_float(
15 [Link](
16 D.div_int(gross_target_price, tick_size),
17 tick_size
18 )
19 )
20 end
First, we would like to convert all the numbers to the Decimal structs so it will be easier to work
with them. We will also hardcode the fee which we will refactor in one of the future chapters.
We started by calculating the gross_buy_price which is a sum of buy
price together with the fee that we paid on top of it.
Next, we enlarge the originally paid price by profit interval to get net_target_price
As we will be charged a fee for selling, we need to add the fee again on top of this target sell
price(gross_target_price).
Next, we will use the tick size as Binance won’t accept any prices that aren’t divisible by the symbols’
tick sizes so we need to “normalize” them on our side.
1 # /apps/naive/lib/naive/[Link]
2 def handle_cast(
3 %TradeEvent{
4 seller_order_id: order_id,
5 quantity: quantity
6 },
7 %State{
8 sell_order: %[Link]{
9 order_id: order_id,
10 orig_qty: quantity
11 }
12 } = state
13 ) do
14 [Link]("Trade finished, trader will now exit")
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 27
When the sell order was successfully filled(confirmed by pattern matching above), there’s nothing
else to do for the trader, so it can retrun a tuple with :stop atom which will cause the trader process
to terminate.
1 # /apps/naive/lib/naive/[Link]
2 def handle_cast(%TradeEvent{}, state) do
3 {:noreply, state}
4 end
We need this callback for cases where our trader has an “open” order(not yet filled) and the incoming
event has nothing to do with it, so it needs to be ignored.
1 # /apps/naive/lib/[Link]
2 defmodule Naive do
3 @moduledoc """
4 Documentation for `Naive`.
5 """
6 alias [Link]
7
8 def send_event(%TradeEvent{} = event) do
9 [Link](:trader, event)
10 end
11 end
We will use the fact that we registered our trader process with a name to be able to cast a message
to it.
1 # /apps/streamer/lib/streamer/[Link]
2 Naive.send_event(trade_event)
This creates a two way link between the streamer and the naive app. In the next chapter we will fix
that as in the perfect world those apps shouldn’t even be aware of existance of each other.
1 config :binance,
2 api_key: "YOUR-API-KEY-HERE",
3 secret_key: "YOUR-SECRET-KEY-HERE"
Important note: To be able to run below test and perform real trades, Binance account is required
with balance of at least 20 USDT. In the 4th chapter we will focus on creating a BinanceMock that
will allow us to run our bot without requirement for a real Binance account. You don’t need to test
run it now if you don’t need/want to have an account.
Test run
Now it’s the time to give our implementation a run for it’s money. Once again, to be able to do that
you will need to have at least 20 USDT tokens in your Binance’s wallet and you will loose just under
0.5% of your USDTs(as “expected profit” is below 0 to quickly showcase the full trade cycle) in the
following test:
1 $ iex -S mix
2 ...
3 iex(1)> [Link].start_link(%{symbol: "XRPUSDT", profit_interval: [Link]("-\
4 0.01")})
5 [Link].648 [info] Initializing new trader for XRPUSDT
6 {:ok, #PID<0.355.0>}
7 iex(2)> Streamer.start_streaming("xrpusdt")
8 {:ok, #PID<0.372.0>}
9 iex(3)>
10 [Link].561 [info] Placing BUY order for XRPUSDT @ 0.25979000, quantity: 100
11 [Link].831 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.2577, quant\
12 ity: 100
13 [Link].094 [info] Trade finished, trader will now exit
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 29
After starting the IEx session, start the trader process with a map containing symbol and profit
interval. To be able to quickly test full trade cycle we will pass sub-zero profit interval instead of
waiting for the increase in price.
Next, we will start streaming on the same symbol, please be aware that this will cause an immediate
reaction in the trader process.
We can see that our trader placed a buy order at 25.979c per XRP, it was filled in under 300ms, so
then the trader placed a sell order at ∼25.77c
which was also filled in under 300ms. This way the trader finished the trade
cycle and process can terminate.
That’s it. Congratulations! You just made your first algorithmic trade and you should be proud of
that! In the process of creating that algorithm we touched on multiple topics including GenServer
and depending on it’s state and external data (trade events) to perform different actions - this is a
very common workflow that Elixir engineers are following and it’s great to see it in action.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²¹
²¹[Link]
Chapter 3 - Introduce PubSub as a
communication method
Objectives
• consider reasons why introducing a PubSub communication would be benefitial
• implement the PubSub communication between the [Link] and the [Link](s)
Design
First, let’s look at the current situation:
Current situation
We started with the Binance streamer calling the send_event/1 function on the Naive module. The
Naive module then calls the trader process using the GenServer’s cast/2 function(via it’s registered
name).
Next step in the process of extending our trading strategy will be to scale it to run multiple
[Link] processes in parallel. To be able to do that we will need to remove the option to
register the trader process with a name(as only one process can be registered under single name).
Chapter 3 - Introduce PubSub as a communication method 31
The second issue with that design was the fact that the Streamer needs to be aware of all processes
that are interested in the streamed data and explicitly push that information to them.
To fix those issues we will invert the design and introduce a PubSub mechanism:
PubSub introduced to invert the dependency and enable multiple parallel traders
Chapter 3 - Introduce PubSub as a communication method 32
The streamer will broadcast trade events to the PubSub topic and whatever is interested in that data,
can subscribe to the topic and it will receive the broadcasted messages.
There’s no coupling between the Streamer and Naive app any more.
We can now introduce multiple traders that will subscribe to the topic and
they will receive messages from the PubSub:
Going even further down the line we can picture that system could consist of other processes
interested in the streamed data. Example of those could be a process that will save all streamed
information to the database to be utilized in backtesting later on:
Chapter 3 - Introduce PubSub as a communication method 33
Implementation
We will start by adding a [Link]²² library to both Streamer and Naive app(as both will be
using it, Streamer app as a broadcaster and Naive app as a subscriber)
Scrolling down through it’s readme on GitHub we can see that we need to add :phoenix_pubsub to
list of dependencies:
Remember to place it so the list will keep alphabetical order. Second step in the readme says that we
need to add PubSub as a child of our app. We need to decide where we will put it, Streamer sounds
like a good starting point. We will modify the /apps/streamer/lib/streamer/[Link]
module by appending the PubSub to it:
1 # /apps/streamer/lib/streamer/[Link]
2 def start(_type, _args) do
3 children = [
4 {
5 [Link],
6 name: [Link], adapter_name: [Link].PG2
7 }
8 ]
9 ...
10 end
We will add the :adapter_name option to instruct PubSub to use pg²³ adapter, which will give us
distrubuted process groups.
We will now modify the streamer to broadcast a message to PubSub instead of using the Naive
module’s function:
1 # /apps/streamer/lib/streamer/[Link]
2 defp process_event(...) do
3 ...
4
5 [Link](
6 [Link],
7 "trade_events:#{trade_event.symbol}",
8 trade_event
9 )
10 end
Inside the trader on init we need to subscribe to the “trade_events” PubSub channel:
²³[Link]
Chapter 3 - Introduce PubSub as a communication method 35
1 # /apps/naive/lib/naive/[Link]
2 def init(...) do
3 ...
4
5 [Link](
6 [Link],
7 "trade_events:#{symbol}"
8 )
9
10 ...
11 end
Next, we need to convert all handle_call callbacks to handle_info inside our Trader module as
PubSub doesn’t use [Link]/2 to send messages over to subscribers.
The final change will be to remove the send_event function from the Naive
module as it’s no longer required.
Our update is now finished so we can start an iex session to see how it works.
First, we will start a streamer process that will broadcast messages
to PubSub. Next, we will start trading on the same symbol. On init, trader will subscribe to a PubSub
channel and it will make a full trade cycle.
1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("xrpusdt") {:ok, #\
4 PID<0.483.0>}
5 iex(2)> [Link].start_link(%{symbol: "XRPUSDT", profit_interval: [Link]("-\
6 0.01")})
7 [Link].482 [info] Initializing new trader for XRPUSDT
8 {:ok, #PID<0.474.0>}
9 iex(3)>
10 [Link].179 [info] Placing BUY order for XRPUSDT @ 0.29462000, quantity: 100
11 [Link].783 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.29225), qu\
12 antity: 100.00000000
13 [Link].029 [info] Trade finished, trader will now exit
This shows that new trader process successfully subscribed to the PubSub, received the broadcasted
messages, placed buy/sell orders and terminated after full trade cycle finished.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁴
²⁴[Link]
Chapter 4 - Mock the Binance API
Objectives
• design the binance mock application
• create a new app
• implement getting exchange info
• implement placing buy and sell orders
• implement callback for incoming trade events
• upgrade trader and config
• test the implementation
Design
First, let’s start with the current state:
We will set up a config so it points to the Binance client to be used - either Binance or BinanceMock.
Regards the BinanceMock itself it will have the same interface as the Binance module.
It will need to store both buy and sell orders and it will allow us to retrieve them. That will cover the
REST functions but Binance also streams back trade events for those orders as they get filled, that’s
why BinanceMock will also need to broadcast fake events to the “trade_events:#” PubSub topic so
the trader will pick them up:
Chapter 4 - Mock the Binance API 38
When exactly should we broadcast those fake trade events? Well, the best thing
that we can do is make BinanceMock process to subscribe to the trade events stream and try to
broadcast fake trade events whenever the price of orders would be matched:
Starting from the arrow on the left, our naive strategy will place an order at the current price.
In this hypotetical scenario price was raising for a moment after placing the buy order, so
BinanceMock will keep on waiting until a trade event will get broadcasted from the PubSub with
price below the buy order’s price. At that moment BinanceMock will generate a fake trade event
and broadcast it to the same PubSub topic.
Chapter 4 - Mock the Binance API 39
Trader will get that event and assume that it came from the Binance and that the buy order got filled
so it will place a sell order.
Similarly to the buy order, BinanceMock will keep on waiting until a trade event will get broadcasted
from the PubSub with the price above the sell order’s price. At that moment BinanceMock will
generate a fake trade event and broadcast it to the same PubSub topic.
Enough theory for now, let’s get our hands dirty with some coding
1 $ cd apps
2 $ mix new binance_mock --sup
order_books map will consist of :"#{symbol} ⇒ %OrderBook{}. We will define the %OrderBook{}
struct as 3 lists buy_side, sell_side and historical:
1 # /apps/binance_mock/lib/binance_mock.ex
2 defmodule BinanceMock do
3 use GenServer
4
5 alias Decimal, as: D
6
7 require Logger
8
9 defmodule State do
10 defstruct order_books: %{}, subscriptions: [], fake_order_id: 1
11 end
Chapter 4 - Mock the Binance API 40
12
13 defmodule OrderBook do
14 defstruct buy_side: [], sell_side: [], historical: []
15 end
16
17 def start_link(_args) do
18 GenServer.start_link(__MODULE__, nil, name: __MODULE__)
19 end
20
21 def init(_args) do
22 {:ok, %State{}}
23 end
24 end
1 # /apps/binance_mock/lib/binance_mock.ex
2 def get_exchange_info() do
3 Binance.get_exchange_info()
4 end
1 # /apps/binance_mock/lib/binance_mock.ex
2 def order_limit_buy(symbol, quantity, price, "GTC") do
3 order_limit(symbol, quantity, price, "BUY")
4 end
5
6 def order_limit_sell(symbol, quantity, price, "GTC") do
7 order_limit(symbol, quantity, price, "SELL")
8 end
• ensure that quantity and price are float values as the Binance module accepts both strings and
floats
• generate a fake order based on symbol, quantity, price, and side
• cast a message to the BinanceMock process to add the fake order
• return with a tuple with %OrderResponse{} struct to be consistent with the Binance module:
1 # /apps/binance_mock/lib/binance_mock.ex
2 defp order_limit(symbol, quantity, price, side) do
3 quantity = [Link]("#{quantity}") |> elem(0)
4 price = [Link]("#{price}") |> elem(0)
5
6 %[Link]{} =
7 fake_order =
8 generate_fake_order(
9 symbol,
10 quantity,
11 price,
12 side
13 )
14
15 [Link](
16 __MODULE__,
17 {:add_order, fake_order}
18 )
19
20 {:ok, convert_order_to_order_response(fake_order)}
21 end
We can now move on to the implementation of the handle_cast/2 callback to :add_order to the
order book for the symbol from the order.
It needs to do two things:
the order
1 # /apps/binance_mock/lib/binance_mock.ex
2 def handle_cast(
3 {:add_order, %[Link]{symbol: symbol} = order},
4 %State{
5 order_books: order_books,
6 subscriptions: subscriptions
7 } = state
8 ) do
9 new_subscriptions = subscribe_to_topic(symbol, subscriptions)
10 updated_order_books = add_order(order, order_books)
11
12 {
13 :noreply,
14 %{
15 state
16 | order_books: updated_order_books,
17 subscriptions: new_subscriptions
18 }
19 }
20 end
We will start with the implementation of subscribe_to_topic/2 function. We need to make sure
that the symbol is uppercase’d as well as check have we already subscribed to that topic. Otherwise,
we can safely use the PubSub module to subscribe to the trade_events:#{symbol} topic for this
symbol.
We need to remember to append the symbol to the list of subscription and return the updated list:
1 # /apps/binance_mock/lib/binance_mock.ex
2 defp subscribe_to_topic(symbol, subscriptions) do
3 symbol = [Link](symbol)
4 stream_name = "trade_events:#{symbol}"
5
6 case [Link]?(subscriptions, symbol) do
7 false ->
8 [Link]("BinanceMock subscribing to #{stream_name}")
9
10 [Link](
11 [Link],
12 stream_name
13 )
14
15 [symbol | subscriptions]
16
Chapter 4 - Mock the Binance API 43
17 _ ->
18 subscriptions
19 end
20 end
Next, time for implementation of add_order function. First, we need to get the order book for the
symbol of the order. Depends on the side of the order we will update either buy_side or sell_side
list remembering that both sides are sorted. We are sorting them so we can easily grab all orders
that should be filled whenever trade event arrived, this will become clearer as we will write a handle
callback for incoming trade events:
1 # /apps/binance_mock/lib/binance_mock.ex
2 defp add_order(
3 %[Link]{symbol: symbol} = order,
4 order_books
5 ) do
6 order_book =
7 [Link](
8 order_books,
9 :"#{symbol}",
10 %OrderBook{}
11 )
12
13 order_book =
14 if [Link] == "SELL" do
15 [Link]!(
16 order_book,
17 :sell_side,
18 [order | order_book.sell_side]
19 |> [Link](&[Link]?([Link](&[Link]), [Link](&[Link])))
20 )
21 else
22 [Link]!(
23 order_book,
24 :buy_side,
25 [order | order_book.buy_side]
26 |> [Link](&[Link]?([Link](&[Link]), [Link](&[Link])))
27 )
28 end
29
30 [Link](order_books, :"#{symbol}", order_book)
31 end
Chapter 4 - Mock the Binance API 44
1 # /apps/binance_mock/lib/binance_mock.ex
2 defp generate_fake_order(symbol, quantity, price, side)
3 when is_binary(symbol) and
4 is_float(quantity) and
5 is_float(price) and
6 (side == "BUY" or side == "SELL") do
7 current_timestamp = :os.system_time(:millisecond)
8 order_id = [Link](__MODULE__, :generate_id)
9 client_order_id = :[Link](:md5, "#{order_id}") |> Base.encode16()
10
11 [Link](%{
12 symbol: symbol,
13 order_id: order_id,
14 client_order_id: client_order_id,
15 price: Float.to_string(price),
16 orig_qty: Float.to_string(quantity),
17 executed_qty: "0.00000000",
18 cummulative_quote_qty: "0.00000000",
19 status: "NEW",
20 time_in_force: "GTC",
21 type: "LIMIT",
22 side: side,
23 stop_price: "0.00000000",
24 iceberg_qty: "0.00000000",
25 time: current_timestamp,
26 update_time: current_timestamp,
27 is_working: true
28 })
29 end
1 # /apps/binance_mock/lib/binance_mock.ex
2 defp convert_order_to_order_response(%[Link]{} = order) do
3 %{
4 struct(
5 [Link],
6 order |> Map.to_list()
7 )
8 | transact_time: [Link]
9 }
10 end
Last function to finish support for placing buy and sell orders is to add a callback that will iterate
the fake order id and return it:
1 # /apps/binance_mock/lib/binance_mock.ex
2 def handle_call(
3 :generate_id,
4 _from,
5 %State{fake_order_id: id} = state
6 ) do
7 {:reply, id + 1, %{state | fake_order_id: id + 1}}
8 end
1 # /apps/binance_mock/lib/binance_mock.ex
2 def get_order(symbol, time, order_id) do
3 [Link](
4 __MODULE__,
5 {:get_order, symbol, time, order_id}
6 )
7 end
The callback itself is pretty straightforward. We will need to get order book for passed symbol. As
we don’t know the order’s side, we will concat all 3 lists(buy_side, sell_side and historical) and try
to find an order that will
match passed symbol, time and order_id:
Chapter 4 - Mock the Binance API 46
1 # /apps/binance_mock/lib/binance_mock.ex
2 def handle_call(
3 {:get_order, symbol, time, order_id},
4 _from,
5 %State{order_books: order_books} = state
6 ) do
7 order_book =
8 [Link](
9 order_books,
10 :"#{symbol}",
11 %OrderBook{}
12 )
13
14 result =
15 (order_book.buy_side ++
16 order_book.sell_side ++
17 order_book.historical)
18 |> [Link](
19 &(&[Link] == symbol and
20 &[Link] == time and
21 &1.order_id == order_id)
22 )
23
24 {:reply, {:ok, result}, state}
25 end
• get the order book for the symbol from the trade event
• use the take_while/2 function on the buy orders with prices that are greater than the current
price - we can update their status to filled.
• use the take_while/2 function again, this time to sell orders with prices less than the current
price, we will also update their statuses to filled.
• concat both lists of filled orders, convert them to trade events and broadcast them to the
PubSub’s trade_events topic.
• remove the filled orders from buy and sell lists and put them into the historical list.
Here we can clearly see the benefit of sorting the lists, we can use functions like take_while/2 and
drop/2 instead of filter/2
Chapter 4 - Mock the Binance API 47
and reject/2(later ones will go through whole lists which could become a bottleneck when multiple
open orders would be active):
1 # /apps/binance_mock/lib/binance_mock.ex
2 def handle_info(
3 %[Link]{} = trade_event,
4 %{order_books: order_books} = state
5 ) do
6 order_book =
7 [Link](
8 order_books,
9 :"#{trade_event.symbol}",
10 %OrderBook{}
11 )
12
13 filled_buy_orders =
14 order_book.buy_side
15 |> Enum.take_while(&[Link]?([Link](trade_event.price), [Link](&[Link])))
16 |> [Link](&[Link]!(&1, :status, "FILLED"))
17
18 filled_sell_orders =
19 order_book.sell_side
20 |> Enum.take_while(&[Link]?([Link](trade_event.price), [Link](&[Link])))
21 |> [Link](&[Link]!(&1, :status, "FILLED"))
22
23 (filled_buy_orders ++ filled_sell_orders)
24 |> [Link](&convert_order_to_event(&1, trade_event.event_time))
25 |> [Link](&broadcast_trade_event/1)
26
27 remaining_buy_orders =
28 order_book.buy_side
29 |> [Link](length(filled_buy_orders))
30
31 remaining_sell_orders =
32 order_book.sell_side
33 |> [Link](length(filled_sell_orders))
34
35 order_books =
36 [Link]!(
37 order_books,
38 :"#{trade_event.symbol}",
39 %{
40 buy_side: remaining_buy_orders,
Chapter 4 - Mock the Binance API 48
41 sell_side: remaining_sell_orders,
42 historical:
43 filled_buy_orders ++
44 filled_sell_orders ++
45 order_book.historical
46 }
47 )
48
49 {:noreply, %{state | order_books: order_books}}
50 end
Inside the callback we referred to two new functions that we will implement now(convert_order_-
to_event and broadcast_trade_event).
Starting with the convert_order_to_event function, it will simply return a new [Link]
struct filled with data. An interesting thing to observe here is that again all values are predicatable
and function will return the same values for the same input - this will become beneficial for
backtesting over and over again and comparing the behaviour between runs:
1 # /apps/binance_mock/lib/binance_mock.ex
2 defp convert_order_to_event(%[Link]{} = order, time) do
3 %[Link]{
4 event_type: [Link],
5 event_time: time - 1,
6 symbol: [Link],
7 trade_id: Integer.floor_div(time, 1000),
8 price: [Link],
9 quantity: order.orig_qty,
10 buyer_order_id: order.order_id,
11 seller_order_id: order.order_id,
12 trade_time: time - 1,
13 buyer_market_maker: false
14 }
15 end
Broadcasting trade event to PubSub will be the last function that will finish
the implementation of BinanceMock for now. It’s important to upcase the symbol as we want to be
sure that we will match the topic name which is case-sensitive:
Chapter 4 - Mock the Binance API 49
1 # /apps/binance_mock/lib/binance_mock.ex
2 defp broadcast_trade_event(%[Link]{} = trade_event) do
3 symbol = [Link](trade_event.symbol)
4
5 [Link](
6 [Link],
7 "trade_events:#{symbol}",
8 trade_event
9 )
10 end
1 # /apps/binance_mock/lib/binance_mock/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {BinanceMock, []}
6 ]
7 ...
8 end
9 end
1 # /apps/naive/lib/naive/[Link]
2 @binance_client Application.get_env(:naive, :binance_client)
We need to replace all direct calls to the Binance module for calls to the @binance_client attribute
inside the [Link]:
Chapter 4 - Mock the Binance API 50
1 # /apps/naive/lib/naive/[Link]
2
3 ...
4 @binance_client.order_limit_buy(
5 ...
6 @binance_client.order_limit_sell
7 ...
8 @binance_client.get_exchange_info()
9 ...
As the [Link] is now relying on the config to specify which Binance client should they use,
we need to add it to the config:
1 # /config/[Link]
2
3 config :naive,
4 binance_client: BinanceMock
Last modification to our system will be to modify the [Link] of the binance_mock app to list all
deps required for it to work:
1 # /apps/binance_mock/[Link]
2 ...
3 defp deps do
4 [
5 {:binance, "~> 0.7.1"},
6 {:decimal, "~> 2.0"},
7 {:phoenix_pubsub, "~> 2.0"},
8 {:streamer, in_umbrella: true}
9 ]
10 end
11 ...
We also add :binance_mock to the list of deps of the naive app(as the Naive app will use either
Binance or BinanceMock to “trade”):
Chapter 4 - Mock the Binance API 51
1 # /apps/naive/[Link]
2 ...
3 defp deps do
4 [
5 ...
6 {:binance_mock, in_umbrella: true}
7 ...
8 ]
9 end
10 ...
1 $ iex -S mix
2 ...
3 iex(1)> [Link](BinanceMock)
4 #PID<0.320.0> # <- confirms that BinanceMock process is alive
5 iex(2)> Streamer.start_streaming("xrpusdt")
6 {:ok, #PID<0.332.0>}
7 iex(3)> [Link].start_link(%{symbol: "XRPUSDT", profit_interval: [Link]("-\
8 0.001")})
9 [Link].232 [info] Initializing new trader for XRPUSDT
10 {:ok, #PID<0.318.0>}
11 [Link].826 [info] Placing BUY order for XRPUSDT @ 0.29520000, quantity: 100
12 [Link].569 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.29549), qu\
13 antity: 100.0
14 [Link].391 [info] Trade finished, trader will now exit
As config already points to it so we can continue as previously by starting the streaming and trading
on the symbol. The trader is using the BinanceMock and it looks like everything works as it would
be dealing with a real exchange.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁵
²⁵[Link]
Chapter 5 - Enable parallel trading on
multiple symbols
Objectives
• design supervision tree that will allow trading using multiple traders in parallel per symbol
• update application supervisor
• implement [Link]
• implement [Link]
GenServer.start_link/3 creates a link between IEx’s process and new [Link] process.
Whenever trader terminates(either finishes the trading cycle or there was an error), new one won’t
get started as there’s no supervision at all.
We can do much better than that with a a little bit of help from Elixir and OTP.
Chapter 5 - Enable parallel trading on multiple symbols 53
Let’s introduce a supervisor above our trader process. It will start a new trader process whenever
previous one finished/crashed:
This looks much better but there are few problems with it. So, when the trader will start to place
orders it will be in some state(it will hold buy/sell orders) that the supervisor won’t be aware of. In
case of trader crashing, the supervisor will start a new trader without any knowledge of possibly
placed orders or any other information from the state(it will be started with a “fresh” state).
To fix that we need to keep a copy of the trader’s state outside of the trader process - that’s why we
will introduce a new server called [Link] that will keep track of traders’ data:
The [Link] will become the interface to start new traders. It will call the start_child/1
function of the Supervisor, then consequently DynamicTraderSupervisor will call the start_link/1
function of our [Link] module.
We can also see that our [Link]’s are now started with the temporary restart option. Setting
this option will disallow the Supervisor from restarting the traders on its own. The responsibility of
restarting traders will now be shifted to the leader. Leader will monitor the traders and restart them
to a correct state when any crashes.
As trader state will get updated, it will notify the leader about it’s new state to be stored. This way
Chapter 5 - Enable parallel trading on multiple symbols 54
whenever trader would crash, leader will be able to start new trader process with last known state.
This setup will also allow us to start and supervise multiple traders for a single symbol which our
naive strategy will require in the future(next chapter).
For each symbol that we will be trading on we need a above trio of services(Leader + Dy-
namicTraderSupervisor + Trader), to effectively initialize(and supervise) them we will add an
[Link] that will start both [Link] and ‘[Link]:
We will need multiple symbol supervisors, one for each symbol that we would like to trade on. As
with traders, they will be dynamically started on demand, this should give us a hint that we need
another dynamic supervisor that will supervise symbol supervisors and will be direct child of our
[Link]([Link] module):
Chapter 5 - Enable parallel trading on multiple symbols 55
You could ask yourself why we don’t need some additional server to track which symbols are traded
at the moment (in the same way as [Link] tracks [Link]). The answer is that we don’t
need to track them as we register all [Link] with a name containing symbol that
they trade on. This way we will always be able to refer to them by registered name instead of
pids/refs.
Here’s what happens starting from the top of the graph:
• the [Link] is our top-level application’s supervisor for the naive app, it was auto-
generated as a part of the naive app
• it has a single child [Link], which has strategy one_for_one and all
of it’s children are [Link]
Chapter 5 - Enable parallel trading on multiple symbols 56
• [Link] process will start 2 further children: the [Link] and DynamicTraderSupervisor
both created on init
• the [Link] will ask DynamicTraderSupervisor to start the [Link] child pro-
cess(es)
This can be a little bit confusing at the moment but it will get a lot easier
as we will write the code. Let’s get to it!
1 # /apps/naive/lib/naive/[Link]
2 def start(_type, _args) do
3 children = [
4 {
5 DynamicSupervisor,
6 strategy: :one_for_one,
7 name: [Link]
8 }
9 ]
10
11 ...
12 end
1 # /apps/naive/lib/[Link]
2 def start_trading(symbol) do
3 symbol = [Link](symbol)
4
5 {:ok, _pid} =
6 DynamicSupervisor.start_child(
7 [Link],
8 {[Link], symbol}
9 )
10 end
Chapter 5 - Enable parallel trading on multiple symbols 57
Implement [Link]
Next, time for the [Link], first step will be to create a file called symbol_-
[Link] inside apps/naive/lib/naive directory. There’s no point of using the DynamicSu-
pervisor²⁶, as we know the children that we would like to start automatically on init. This is a
full implementation of the supervisor and it’s a simple as just listing child processes inside the init
function:
1 # /apps/naive/lib/naive/symbol_supervisor.ex
2 defmodule [Link] do
3 use Supervisor
4
5 require Logger
6
7 def start_link(symbol) do
8 Supervisor.start_link(
9 __MODULE__,
10 symbol,
11 name: :"#{__MODULE__}-#{symbol}"
12 )
13 end
14
15 def init(symbol) do
16 [Link]("Starting new supervision tree to trade on #{symbol}")
17
18 [Link](
19 [
20 {
21 DynamicSupervisor,
22 strategy: :one_for_one,
23 name: :"[Link]-#{symbol}"
24 },
25 {[Link], symbol}
26 ],
27 strategy: :one_for_all
28 )
29 end
30 end
We registered the [Link] processes with names, which will help us understand
the supervision tree inside the observer GUI(it will also allow us to stop those supervisors in the
future).
As mentioned previously whenever either the [Link] or [Link]-#{symbol}
would crash we would like to kill the other child process as we won’t be able to recover the state -
it’s just easier to init both again.
Implement [Link]
It’s time for the [Link] module, again, first step will be to create a file called [Link] inside
apps/naive/lib/naive directory. At this moment it will be a skeleton GenServer implementation
just to get the code to compile:
1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use GenServer
4
5 def start_link(symbol) do
6 GenServer.start_link(
7 __MODULE__,
8 symbol,
9 name: :"#{__MODULE__}-#{symbol}"
10 )
11 end
12
13 def init(symbol) do
14 {:ok, %{symbol: symbol}}
15 end
16 end
At this moment we have half of the supervision tree working so we can give it
a spin in iex. Using the observer we will be able to see all processes created when start trading
function gets called:
1 $ iex -S mix
2 ...
3 iex(1)> :[Link]()
To clearly see the supervision tree we will click on “Applications” tab at the top - the following tree
of processes will be shown on the left:
If any other process tree is visible, go to the list on the left and select the naive application.
The [Link] is our [Link] module(you can confirm that by checking the name
option send to the start_link function inside the module). It start the [Link].
We can now call the Naive.start_trading/1 function couple time to see how the tree will look like
with additional processes(go back to the iex session):
Chapter 5 - Enable parallel trading on multiple symbols 60
1 ...
2 iex(2)> Naive.start_trading("adausdt")
3 [Link].974 [info] Starting new supervision tree to trade on ADAUSDT
4 {:ok, #PID<0.340.0>}
5 iex(3)> Naive.start_trading("xrpusdt")
6 [Link].117 [info] Starting new supervision tree to trade on XRPUSDT
7 {:ok, #PID<0.345.0>}
• SymbolSupervisor-ADAUSDT
• SymbolSupervisor-XRPUSDT
1 # /apps/naive/lib/naive/[Link]
2 ...
3 alias [Link]
4
5 require Logger
6
7 @binance_client Application.get_env(:naive, :binance_client)
8
9 defmodule State do
10 defstruct symbol: nil,
11 settings: nil,
12 traders: []
13 end
14
15 defmodule TraderData do
16 defstruct pid: nil,
17 ref: nil,
18 state: nil
19 end
1 # /apps/naive/lib/naive/[Link]
2 def init(symbol) do
3 {:ok,
4 %State{
5 symbol: symbol
6 }, {:continue, :start_traders}}
7 end
The [Link] will fetch symbol settings and based on them, it will build the state for traders
so they don’t need to fetch the same settings again. It will also start as many traders there were set
under chunks key in setting:
1 # /apps/naive/lib/naive/[Link]
2 # below init()
3 def handle_continue(:start_traders, %{symbol: symbol} = state) do
4 settings = fetch_symbol_settings(symbol)
5 trader_state = fresh_trader_state(settings)
6 traders = for _i <- 1..[Link],
7 do: start_new_trader(trader_state)
8
9 {:noreply, %{state | settings: settings, traders: traders}}
10 end
Fetching symbol settings will be hardcoded for time being to keep this chapter focused. We will also
move the code responsible for fetching tick
size from the [Link] to the [Link] and hardcode the rest of the values:
1 # /apps/naive/lib/naive/[Link]
2 defp fetch_symbol_settings(symbol) do
3 tick_size = fetch_tick_size(symbol)
4
5 %{
6 symbol: symbol,
7 chunks: 1,
8 # -0.12% for quick testing
9 profit_interval: [Link]("-0.0012"),
10 tick_size: tick_size
11 }
12 end
13
14 defp fetch_tick_size(symbol) do
15 @binance_client.get_exchange_info()
Chapter 5 - Enable parallel trading on multiple symbols 62
16 |> elem(1)
17 |> [Link](:symbols)
18 |> [Link](&(&1["symbol"] == symbol))
19 |> [Link]("filters")
20 |> [Link](&(&1["filterType"] == "PRICE_FILTER"))
21 |> [Link]("tickSize")
22 |> [Link]()
23 end
Additionally we need to create a helper method that we used inside the handle_continue/2 callback
called fresh_trader_state/1:
1 # /apps/naive/lib/naive/[Link]
2 # place this one above the `fetch_symbol_settings` function
3 defp fresh_trader_state(settings) do
4 struct([Link], settings)
5 end
Starting a new trader isn’t any different from the code that we already wrote to start a new
[Link]. We need to call the DynamicSupervisor.start_child/2 function and start
to monitor the process:
1 # /apps/naive/lib/naive/[Link]
2 defp start_new_trader(%[Link]{} = state) do
3 {:ok, pid} =
4 DynamicSupervisor.start_child(
5 :"[Link]-#{[Link]}",
6 {[Link], state}
7 )
8
9 ref = [Link](pid)
10
11 %TraderData{pid: pid, ref: ref, state: state}
12 end
1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use GenServer, restart: :temporary
4 ...
Next, we will update start_link/1 and init/1 to take the state instead of building it from args:
1 # /apps/naive/lib/naive/[Link]
2 def start_link(%State{} = state) do
3 GenServer.start_link(__MODULE__, state)
4 end
5
6 def init(%State{symbol: symbol} = state) do
7 symbol = [Link](symbol)
8
9 [Link]("Initializing new trader for symbol(#{symbol})")
10
11 [Link](
12 [Link],
13 "trade_events:#{symbol}"
14 )
15
16 {:ok, state}
17 end
Next, we need to update two handle_info/2 callbacks that change the state of the [Link]
process(when placing buy order and when placing sell order). They will need to notify the
[Link] that state is changed before returning it:
1 # /apps/naive/lib/naive/[Link]
2 ...
3
4 def handle_info(
5 ...
6 ) do
7 [Link]("Placing buy order (#{symbol}@#{price})")
8 ...
9 new_state = %{state | buy_order: order}
10 [Link](:trader_state_updated, new_state)
11 {:noreply, new_state}
12 end
13
Chapter 5 - Enable parallel trading on multiple symbols 64
14 def handle_info(
15 ...
16 ) do
17 ...
18 [Link]("Buy order filled, placing sell order ...")
19 ...
20
21 new_state = %{state | sell_order: order}
22 [Link](:trader_state_updated, new_state)
23 {:noreply, new_state}
24 end
25 ...
1 # /apps/naive/lib/naive/[Link]
2 # below init
3
4 def notify(:trader_state_updated, trader_state) do
5 [Link](
6 :"#{__MODULE__}-#{trader_state.symbol}",
7 {:update_trader_state, trader_state}
8 )
9 end
Now, it’s time for a callback function that will handle the trader state update. As this is a handle_-
call/3 callback we have access to trader pid which sent the notification message. We will try to find
that trader in the list of traders. If that’s successful we will update the cached state for that
trader locally:
1 # /apps/naive/lib/naive/[Link]
2 # below handle_continue
3 def handle_call(
4 {:update_trader_state, new_trader_state},
5 {trader_pid, _},
6 %{traders: traders} = state
7 ) do
8 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
9 nil ->
Chapter 5 - Enable parallel trading on multiple symbols 65
10 [Link](
11 "Tried to update the state of trader that leader is not aware of"
12 )
13 {:reply, :ok, state}
14
15 index ->
16 old_trader_data = [Link](traders, index)
17 new_trader_data = %{old_trader_data | :state => new_trader_state}
18
19 {:reply, :ok, %{state | :traders =>
20 List.replace_at(traders, index, new_trader_data)}}
21 end
22 end
Another callback functions that we will need to provide are two handle_info/2 functions that will
handle the trade finished scenario as well as crashed trader.
First, trade finished scenario. As previously, we will try to find the trader data in the traders list. If
that’s successful, we will start a new trader with fresh state. We will also overwrite existing trader
data locally(as pid, ref and state changed):
1 # /apps/naive/lib/naive/[Link]
2 # below state updated handle_call callback
3 def handle_info(
4 {:DOWN, _ref, :process, trader_pid, :normal},
5 %{traders: traders, symbol: symbol, settings: settings} = state
6 ) do
7 [Link]("#{symbol} trader finished trade - restarting")
8
9 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
10 nil ->
11 [Link](
12 "Tried to restart finished #{symbol} " <>
13 "trader that leader is not aware of"
14 )
15
16 {:noreply, state}
17
18 index ->
19 new_trader_data = start_new_trader(fresh_trader_state(settings))
20 new_traders = List.replace_at(traders, index, new_trader_data)
21
22 {:noreply, %{state | traders: new_traders}}
Chapter 5 - Enable parallel trading on multiple symbols 66
23 end
24 end
Here we will assume that whenever the reason that the [Link] process died is :normal that
means that we stopped it after trade cycle finished.
Final callback that we need to provide will handle the scenario where the trader crashed. We would
like to find the cached state of the crashed trader and start a new one with the same state and then
update the local cache as pid and ref will change for that trader:
1 # /apps/naive/lib/naive/[Link]
2 # below trade finished handle_info callback
3 def handle_info(
4 {:DOWN, _ref, :process, trader_pid, _reason},
5 %{traders: traders, symbol: symbol} = state
6 ) do
7 [Link]("#{symbol} trader died - trying to restart")
8
9 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
10 nil ->
11 [Link](
12 "Tried to restart #{symbol} trader " <>
13 "but failed to find its cached state"
14 )
15
16 {:noreply, state}
17
18 index ->
19 trader_data = [Link](traders, index)
20 new_trader_data = start_new_trader(trader_data.state)
21 new_traders = List.replace_at(traders, index, new_trader_data)
22
23 {:noreply, %{state | traders: new_traders}}
24 end
25 end
IEx testing
That finishes the implementation part, let’s jump into iex session to see how it works.
We will start the observer first, then we will start trading on any valid symbol.
When our trader will start, you should be able to right click and select “Kill process”(leave the reason
as kill) and click “OK”. At that moment you should see that the PID of the trader changed and we
can also see log message from the leader.
Chapter 5 - Enable parallel trading on multiple symbols 67
1 $ iex -S mix
2 ...
3 iex(1)> :[Link]()
4 :ok
5 iex(2)> Naive.start_trading("xrpusdt")
6
7 [Link].041 [info] Starting new supervision tree to trade on XRPUSDT
8 {:ok, #PID<0.455.0>}
9 [Link].697 [info] Initializing new trader for XRPUSDT
10 iex(3)>
11 [Link].476 [error] XRPUSDT trader died - trying to restart
12 [Link].476 [info] Initializing new trader for XRPUSDT
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁷
²⁷[Link]
Chapter 6 - Introduce a
buy_down_interval to make a single
trader more profitable
Objectives
• present reasons why to introduce buy_down_interval
• add buy_down interval to [Link]’s state and calculate buy price
• add buy_down interval to [Link]’s state compiled by the [Link]
• manually test the implementation inside iex
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 69
The [Link] process(marked in above diagram with blue color) at the arrival of the first trade
event, immediately places a buy order at the current price. At the moment when buy order gets
filled, it places sell order which later also gets filled.
The Trader A exits and a new trader B is started which again immediately places a buy order at
the same price as the previous trader just sold. When this gets filled sell order get’s placed and loop
continues on and on.
We can see that there’s a problem here as we just paid a fee twice(once for selling by the Trader A
and once for buying by the Trader B) without really gaining anything(the Trader A could just hold
the currency and could simply cash in on double profit in this specific situation).
The solution is to be more clever about our buy order’s price. The idea is simple, instead of placing a
new buy order at the current price(price from the last TradeEvent), we will introduce a buy_down_-
interval:
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 70
So every new [Link] process as it receives first trade event, trader will take it’s price and
will calculate an decreased price by using the buy_down_interval value(for example 0.005 would be
0.5%) and place a buy order at that calculated price.
When looking at the chart above we can figure out that buy_down_interval should never be smaller
than double the fee(at the moment of writting transaction fee is 0.1%) that you are paying per
transaction.
[Link] implementation
1 # /apps/naive/lib/naive/[Link]
2 ...
3 defmodule State do
4 @enforce_keys [
5 :symbol,
6 :buy_down_interval, # <= add this line
7 :profit_interval,
8 :tick_size
9 ]
10 defstruct [
11 :symbol,
12 :buy_order,
13 :sell_order,
14 :buy_down_interval, # <= add this line
15 :profit_interval,
16 :tick_size
17 ]
18 end
19 ...
Next, we need to update the initial handle_info/2 callback which places the buy order. We need
to retrieve the buy_down_interval and the tick_size from the state of the trader to be able to
calculate the buy price. We will put the logic to calculate that price in a separate function at the end
of the file:
1 # /apps/naive/lib/naive/[Link]
2 ...
3 def handle_info(
4 %TradeEvent{price: price},
5 %State{
6 symbol: symbol,
7 buy_order: nil,
8 buy_down_interval: buy_down_interval, # <= add this line
9 tick_size: tick_size # <= add this line
10 } = state
11 ) do
12 price = calculate_buy_price(price, buy_down_interval, tick_size)
13 # ^ add above call
14 ...
To calculate the buy price we will use a very similar method to the one used
before to calculate the sell price. First, we will need to cast all variables
into the Decimal structs and then, we will simply subtract the buy_down_interval of the price from
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 72
the price. The number that we will end up with won’t necessarily be a legal price as every price
needs to be divisible by the tick_size which we will assure in the last calculation:
1 # /apps/naive/lib/naive/[Link]
2 ...
3 defp calculate_buy_price(price, buy_down_interval, tick_size) do
4 current_price = [Link](price)
5
6 # not necessarily legal price
7 exact_buy_price =
8 [Link](
9 current_price,
10 [Link](current_price, buy_down_interval)
11 )
12
13 D.to_float(
14 [Link](
15 D.div_int(exact_buy_price, tick_size),
16 tick_size
17 )
18 )
19 end
20 ...
[Link] implementation
Next we need to update the [Link] as it needs to add buy_down_interval to the [Link]’s
state:
1 # /apps/naive/lib/naive/[Link]
2 defp fetch_symbol_settings(symbol) do
3 ...
4
5 %{
6 chunks: 1,
7 buy_down_interval: [Link]("0.0001"), # <= add this line
8 # -0.12% for quick testing
9 profit_interval: [Link]("-0.0012"),
10 tick_size: tick_size
11 }
12 end
13 ...
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 73
IEx testing
That finishes the buy_down_interval implementation, we will jump into iex session to see how it
works, but before that for a moment we will change the logging level to debug to see current prices:
1 # config/[Link]
2 ...
3 config :logger,
4 level: :debug # <= updated for our manual test
5 ...
After starting the streaming we should start seeing log messages with current prices. As we updated
our implementation we should place our buy order below current price as it’s visible below:
1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("FLMUSDT")
4 {:ok, #PID<0.313.0>}
5 iex(2)> Naive.start_trading("FLMUSDT")
6 [Link].829 [info] Starting new supervision tree to trade on FLMUSDT
7 ...
8 [Link].755 [info] Initializing new trader for FLMUSDT
9 ...
10 [Link].000 [debug] Trade event received [email protected]
11 [Link].009 [info] Placing BUY order for FLMUSDT @ 0.1517, quantity: 100
As we can see our [Link] process placed an buy order below the current price (based on most
recent trade event received)
[Note] Please remember to revert the change to logger level as otherwise there’s too much noise in
the logs.
[Note 2] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁸
²⁸[Link]
Chapter 7 - Introduce a trader budget
and calculating the quantity
Objectives
• fetch step_size
• append budget and step_size to the Trader’s state compiled by the Leader
• append budget and step_size to the Trader’s state
• calculate quantity
Fetch step_size
In 2nd chapter we hardcoded quantity to 100, it’s time to refactor that. We will need step_size
information from the Binance which we are
already retrieving together with tick_size in the exchangeInfo call(but not getting it out from the
response). So we will rename the fetch_tick_size/1 function to fetch_symbol_filters/1 which
will allow us to return multiple filters(tick_size and step_size) from that function.
1 # /apps/naive/lib/naive/[Link]
2 ...
3 defp fetch_symbol_settings(symbol) do
4 symbol_filters = fetch_symbol_filters(symbol) # <= updated fetch_tick_size
5
6 [Link](
7 %{
8 chunks: 1,
9 budget: 20,
10 buy_down_interval: [Link]("0.0001"),
11 # -0.12% for quick testing
12 profit_interval: [Link]("-0.0012")
13 },
14 symbol_filters
15 )
16 end
17
18 defp fetch_symbol_filters(symbol) do # <= updated fetch_tick_size
Chapter 7 - Introduce a trader budget and calculating the quantity 75
19 symbol_filters =
20 @binance_client.get_exchange_info()
21 |> elem(1)
22 |> [Link](:symbols)
23 |> [Link](&(&1["symbol"] == symbol))
24 |> [Link]("filters")
25
26 tick_size =
27 symbol_filters
28 |> [Link](&(&1["filterType"] == "PRICE_FILTER"))
29 |> [Link]("tickSize")
30 |> [Link]()
31
32 step_size =
33 symbol_filters
34 |> [Link](&(&1["filterType"] == "LOT_SIZE"))
35 |> [Link]("stepSize")
36 |> [Link]()
37
38 %{
39 tick_size: tick_size,
40 step_size: step_size
41 }
42 end
Instead of reassigning the filters one by one into the settings, we will merge them together(#1).
Additionally, we will introduce a budget(#2) which will be shared across all traders of the symbol.
Also, we don’t need to assign tick_size here as it’s part of the settings that are merged.
1 # /apps/naive/lib/naive/[Link]
2 defp fresh_trader_state(settings) do
3 %{
4 struct([Link], settings) |
5 budget: [Link]([Link], [Link])
6 }
7 end
In the code above we are using the Decimal module(aliased as D) to calculate the budget - we need
to alias it at the top of [Link]’s file:
1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use GenServer
4
5 alias Decimal, as: D # <= add this line
6 alias [Link]
7 ...
1 # /apps/naive/lib/naive/[Link]
2 ...
3 defmodule State do
4 @enforce_keys [
5 :symbol,
6 :budget, # <= add this line
7 :buy_down_interval,
8 :profit_interval,
9 :tick_size,
10 :step_size # <= add this line and comma above
11 ]
12 defstruct [
13 :symbol,
14 :budget, # <= add this line
15 :buy_order,
16 :sell_order,
17 :buy_down_interval,
18 :profit_interval,
Chapter 7 - Introduce a trader budget and calculating the quantity 77
19 :tick_size,
20 :step_size # <= add this line and comma above
21 ]
22 end
23 ...
Calculate quantity
Jumping back to the handle_info/2 where the [Link] places a buy order, we need to pattern
match on the step_size and budget then we will be able to swap hardcoded quantity with result of
calling the calculate_quantity/3 function:
1 # /apps/naive/lib/naive/[Link]
2 ...
3 def handle_info(
4 %TradeEvent{price: price},
5 %State{
6 symbol: symbol,
7 budget: budget, # <= add this line
8 buy_order: nil,
9 buy_down_interval: buy_down_interval,
10 tick_size: tick_size,
11 step_size: step_size # <= add this line
12 } = state
13 ) do
14 ...
15 quantity = calculate_quantity(budget, price, step_size)
16 ...
To calculate quantity we will just divide the budget by the price with a caveat that it’s possible (as
with calculating the price) that it’s not a legal quantity value as it needs to be divisiable by step_size:
1 # /apps/naive/lib/naive/[Link]
2 # add below at the bottom of the file
3 ...
4 defp calculate_quantity(budget, price, step_size) do
5 price = D.from_float(price)
6
7 # not necessarily legal quantity
8 exact_target_quantity = [Link](budget, price)
9
Chapter 7 - Introduce a trader budget and calculating the quantity 78
10 D.to_float(
11 [Link](
12 D.div_int(exact_target_quantity, step),
13 step
14 )
15 )
16 end
IEx testing
That finishes the quantity(and budget) implementation, we will jump into iex session to see how it
works.
First, start the streaming and trading on the same symbol and moment later you should see variable
amount of quantity that more or less uses full allowed budget:
1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("XRPUSDT")
4 {:ok, #PID<0.313.0>}
5 iex(2)> Naive.start_trading("XRPUSDT")
6 [Link].829 [info] Starting new supervision tree to trade on XRPUSDT
7 [Link].755 [info] Initializing new trader for XRPUSDT
8 [Link].009 [info] Placing BUY order for XRPUSDT @ 0.29506, quantity: 67.7
9 [Link].456 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.29529, qua\
10 ntity: 67.7
As we can see our [Link] process is now buying and selling based on passed budget.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁹
²⁹[Link]
Chapter 8 - Add support for multiple
transactions per order
Objectives
• describe issue with current implementation
• improve buy order filled callback
• implement buy order “filled” callback
• improve sell order callback
Here we can see our buy order for 1000 units(on the left) and other trader’s sell order(on the right)
for 1000 units. This(order fully filled in single transaction) is a case most of the time but it’s not
ALWAYS the case.
Sometimes our order will be filled by two or more transactions:
Chapter 8 - Add support for multiple transactions per order 80
The easiest and the safest way to check has this event filled our order fully is to fetch our order again
from Binance at the moment when trade event filling our order arrives.
Problem with this approach is that sometimes we will run into a race condition:
From the left, first, we are sending a buy order for quantity 1000 to the Binance. It hangs for a while
until it gets filled by 2 transactions that happened very quickly. Quickly enough for us to receive
both messages almost in the same moment.
When our bot will handle the first one it will fetch our the buy order which is already filled. It will
cause the trader to place a sell order but then the there’s another trade event waiting in the message
box. It will be handled by another callback that will again fetch order and place another sell order
to be placed and that’s obviously not correct.
Chapter 8 - Add support for multiple transactions per order 81
What we need to do is to update the status of the buy order after the first fetch(if it’s filled) so when
second trade event arrives we will ignore it(this will require additonal callback).
The same issue will appear when placing sell order and dealing multiple simultaneous transaction.
1 # /apps/naive/lib/naive/[Link]
2 def handle_info(
3 %TradeEvent{
4 buyer_order_id: order_id # <= quantity got removed from here
5 },
6 %State{
7 symbol: symbol,
8 buy_order:
9 %[Link]{
10 price: buy_price,
11 order_id: order_id,
12 orig_qty: quantity,
13 transact_time: timestamp # <= timestamp added to query order
14 } = buy_order, # <= buy order to update it
15 profit_interval: profit_interval,
16 tick_size: tick_size
17 } = state
18 ) do
Now we can fetch our buy order to check is it already filled. We will get the [Link] struct
instead of the [Link] that we normally deal with. At this moment we will simply
update our [Link] struct from state:
Chapter 8 - Add support for multiple transactions per order 82
1 # /apps/naive/lib/naive/[Link]
2 # inside the same callback
3 def handle_info(
4 ...
5 ) do
6 {:ok, %[Link]{} = current_buy_order} =
7 @binance_client.get_order(
8 symbol,
9 timestamp,
10 order_id
11 )
12
13 buy_order = %{buy_order | status: current_buy_order.status}
14 ...
The rest of the logic inside this callback will depend on the status of the buy order. If our buy
order is “filled” we would like to follow the existing logic but also update buy_order field inside the
state of the trader process. On the other hand if our order is not yet filled the only thing to do is to
update buy_order field inside the state of the Trader process. Here’s and updated body below above
changes(few variables got renamed for clarity as we are now fetch the order):
1 # /apps/naive/lib/naive/[Link]
2 # inside the same callback
3 buy_order = ....
4
5 {:ok, new_state} =
6 if buy_order.status == "FILLED" do
7 sell_price = calculate_sell_price(buy_price, profit_interval, tick_size)
8
9 [Link](
10 "Buy order filled, placing SELL order for " <>
11 "#{symbol} @ #{sell_price}, quantity: #{quantity}"
12 )
13
14 {:ok, %[Link]{} = order} =
15 @binance_client.order_limit_sell(symbol, quantity, sell_price, "GTC")
16
17 {:ok, %{state | buy_order: buy_order, sell_order: order}}
18 else
19 [Link]("Buy order partially filled")
20 {:ok, %{state | buy_order: buy_order}}
21 end
22
Chapter 8 - Add support for multiple transactions per order 83
23 [Link](:trader_state_updated, new_state)
24 {:noreply, new_state}
25 end
As we are branching our logic and both paths are updating the state, we will return it together with
an :ok atom to be able to pattern match it and assign as a new state.
1 # /apps/naive/lib/naive/[Link]
2 # place this callback ABOVE callback from previous section
3 def handle_info(
4 %[Link]{
5 buyer_order_id: order_id
6 },
7 %State{
8 buy_order: %[Link]{
9 order_id: order_id, # <= confirms that it's event for buy order
10 status: "FILLED" # <= confirms buy order filled
11 },
12 sell_order: %[Link]{} # <= confirms sell order placed
13 } = state
14 ) do
15 {:noreply, state}
16 end
• drop the both pattern matches on quantity as we already know that trade event could partially
fill our order (#1)
Chapter 8 - Add support for multiple transactions per order 84
1 # /apps/naive/lib/naive/[Link]
2 def handle_info(
3 %TradeEvent{
4 seller_order_id: order_id # `quantity` check removed below (#1)
5 },
6 %State{
7 symbol: symbol, (#2)
8 sell_order:
9 %[Link]{
10 order_id: order_id,
11 transact_time: timestamp # `transact_time` to `get_order` (#3)
12 } = sell_order # to update order (#4)
13 } = state
14 ) do
1 # /apps/naive/lib/naive/[Link]
2 # inside the callabck
3 {:ok, %[Link]{} = current_sell_order} =
4 @binance_client.get_order(
5 symbol,
6 timestamp,
7 order_id
8 )
9
10 sell_order = %{sell_order | status: current_sell_order.status}
11
Chapter 8 - Add support for multiple transactions per order 85
12 if sell_order.status == "FILLED" do
13 [Link]("Trade finished, trader will now exit")
14 {:stop, :normal, state}
15 else
16 [Link]("Sell order partially filled")
17 new_state = %{state | sell_order: sell_order}
18 {:noreply, new_state}
19 end
1 $ iex -S mix
2 ...
3 iex(1)> Naive.start_trading("XRPUSDT")
4 [Link].977 [info] Starting new supervision tree to trade on XRPUSDT
5 {:ok, #PID<0.331.0>}
6 [Link].073 [info] Initializing new trader for XRPUSDT
7 iex(2)> Streamer.start_streaming("XRPUSDT")
8 {:ok, #PID<0.345.0>}
9 [Link].044 [info] Initializing new trader for XRPUSDT
10 [Link].888 [info] Placing BUY order for XRPUSDT @ 0.28031, quantity: 71.3
11 [Link].023 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.28053, qua\
12 ntity: 71.30000000
13 [Link].865 [info] Trade finished, trader will now exit
14 [Link].865 [info] XRPUSDT Trader finished - restarting
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github³⁰
³⁰[Link]
Chapter 9 - Run multiple traders in
parallel
Objectives
• describe and design the required functionality
• implement rebuy in the [Link]
• implement rebuy in the [Link]
• improve logs by assigning ids to traders
1 # /apps/naive/lib/naive/[Link]
2 ...
3 traders =
4 for _i <- 1..[Link],
5 do: start_new_trader(trader_state)
6 ...
Single trader
We will implement additional trade event callback inside the [Link] that will keep checking
the price after buy order has been filled. Whenever price drops below the buy_order’s price by
rebuy_interval we will notify the [Link] to start the new [Link] process:
Chapter 9 - Run multiple traders in parallel 88
Multiple traders
The [Link] keeps track of how many [Link] are running and needs to honor the
number of chunks set up in the settings (one chunk == one trader).
To stop the [Link] from continuously notifying about a drop in the price we will also
introduce a boolean flag that will track has the [Link] been already notified.
1 # /apps/naive/lib/naive/[Link]
2 ...
3 defmodule State do
4 @enforce_keys [
5 :symbol,
6 :budget,
7 :buy_down_interval,
8 :profit_interval,
9 :rebuy_interval, # <= add this field
10 :rebuy_notified, # <= add this field
11 :tick_size,
12 :step_size
13 ]
14 defstruct [
15 :symbol,
16 :budget,
17 :buy_order,
18 :sell_order,
19 :buy_down_interval,
20 :profit_interval,
21 :rebuy_interval, # <= add this field
22 :rebuy_notified, # <= add this field
23 :tick_size,
24 :step_size
25 ]
26 end
Rebuy logic should be placed almost as last callback just before the one that ignores all events. We
will need to retrieve the current price and buy_price and confirm that we didn’t notify the leader
yet(rebuy_notified flag):
1 # /apps/naive/lib/naive/[Link]
2 ...
3 # sell filled callback here
4 ...
5 def handle_info(
6 %TradeEvent{
7 price: current_price
8 },
9 %State{
10 symbol: symbol,
11 buy_order: %[Link]{
12 price: buy_price
Chapter 9 - Run multiple traders in parallel 90
13 },
14 rebuy_interval: rebuy_interval,
15 rebuy_notified: false
16 } = state
17 ) do
18
19 end
20
21 # catch all callback here
We need to calculate is the current price below the rebuy interval. If yes we will notify the leader
and update the boolean flag. We will abstract calculation to separate function(for readability) that
we will write below:
1 # /apps/naive/lib/naive/[Link]
2 # body of the above callback
3 if trigger_rebuy?(buy_price, current_price, rebuy_interval) do
4 [Link]("Rebuy triggered for #{symbol} trader")
5 new_state = %{state | rebuy_notified: true}
6 [Link](:rebuy_triggered, new_state)
7 {:noreply, new_state}
8 else
9 {:noreply, state}
10 end
As mentioned before, we will set the rebuy_notified boolean flag to true and notify the leader using
the notify function with dedicated atom.
At the bottom of the module we need to add trigger_rebuy? helper function:
1 # /apps/naive/lib/naive/[Link]
2 # bottom of the module
3 defp trigger_rebuy?(buy_price, current_price, rebuy_interval) do
4 current_price = [Link](current_price)
5 buy_price = [Link](buy_price)
6
7 rebuy_price =
8 [Link](
9 buy_price,
10 [Link](buy_price, rebuy_interval)
11 )
12
13 [Link]?(current_price, rebuy_price)
14 end
Chapter 9 - Run multiple traders in parallel 91
1 # /apps/naive/lib/naive/[Link]
2 def handle_continue(:start_traders, %{symbol: symbol} = state) do
3 ...
4 traders = [start_new_trader(trader_state)] # <= updated part
5
6 ...
7 end
We will need to add a new clause of notify function that will handle the rebuy scenario:
1 # /apps/naive/lib/naive/[Link]
2 # add below current `notify` function
3 def notify(:rebuy_triggered, trader_state) do
4 [Link](
5 :"#{__MODULE__}-#{trader_state.symbol}",
6 {:rebuy_triggered, trader_state}
7 )
8 end
We need to add a new handle_call function that will start new traders only when there are still
chunks available(enforce the maximum number of parallel
traders) - let’s start with a header:
1 # /apps/naive/lib/naive/[Link]
2 # place this one after :update_trader_state handle_call
3 def handle_call(
4 {:rebuy_triggered, new_trader_state},
5 {trader_pid, _},
6 %{traders: traders, symbol: symbol, settings: settings} = state
7 ) do
8
9 end
• we need trader’s pid to be able to find it details inside the list of traders
Chapter 9 - Run multiple traders in parallel 92
• we need settings to cofirm number of chunks to be able to limit maximum number of parallel
traders
Moving on to the body of our callback. As with other ones, we will check can we find trader inside
list of traders and based on that we will either start another one(if we didn’t reach the limit of
chunks) or ignore it:
1 # /apps/naive/lib/naive/[Link]
2 # body of our callback
3 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
4 nil ->
5 [Link]("Rebuy triggered by trader that leader is not aware of")
6 {:reply, :ok, state}
7
8 index ->
9 traders =
10 if [Link] == length(traders) do
11 [Link]("All traders already started for #{symbol}")
12 traders
13 else
14 [Link]("Starting new trader for #{symbol}")
15 [start_new_trader(fresh_trader_state(settings)) | traders]
16 end
17
18 old_trader_data = [Link](traders, index)
19 new_trader_data = %{old_trader_data | :state => new_trader_state}
20 new_traders = List.replace_at(traders, index, new_trader_data)
21
22 {:reply, :ok, %{state | :traders => new_traders}}
23 end
In above code we need to remember to update the state of the trader that triggered the rebuy inside
the traders list as well as add state of a new trader to that list.
As with other setting we will hardcode the rebuy_interval(inside the fetch_symbol_settings
function) and assign them to
trader’s state(inside the fresh_trader_state function):
Chapter 9 - Run multiple traders in parallel 93
1 # /apps/naive/lib/naive/[Link]
2 defp fresh_trader_state(settings) do
3 %{
4 struct([Link], settings)
5 | budget: [Link]([Link], [Link]),
6 rebuy_notified: false # <= add this line
7 }
8 end
9
10 defp fetch_symbol_settings(symbol) do
11 ...
12
13 [Link](
14 %{
15 ...
16 chunks: 5, # <= update to 5 parallel traders max
17 budget: [Link]("100"), # <= update this line
18 ...
19 profit_interval: [Link]("-0.0012"),
20 rebuy_interval: [Link]("0.001") # <= add this line
21 },
22 symbol_filters
23 )
24 end
We also update the chunks and the budget to allow our strategy to start up to 5 parallel traders with a
budget of 20 USDT each(100/5) as Binance has minimal order requirement at about $15(when using
the BinanceMock this doesn’t really matter).
1 # /apps/naive/lib/naive/[Link]
2 defp fresh_trader_state(settings) do
3 %{
4 struct([Link], settings)
5 | id: :os.system_time(:millisecond), # <= add this line
6 budget: [Link]([Link], [Link]),
7 rebuy_notified: false
8 }
9 end
Now we can move on to the [Link] and add it to the %State{} struct as well as we will modify
every callback to include that id inside log messages:
1 # /apps/naive/lib/naive/[Link]
2 defmodule State do
3 @enforce_keys [
4 :id,
5 ...
6 ]
7 defstruct [
8 :id,
9 ...
10 ]
11 end
12
13 ...
14
15 def init(%State{id: id, symbol: symbol} = state) do
16 ...
17
18 [Link]("Initializing new trader(#{id}) for #{symbol}")
19
20 ...
21 end
22
23 def handle_info(
24 %TradeEvent{price: price},
25 %State{
26 id: id,
27 ...
28 } = state
29 ) do
30 ...
Chapter 9 - Run multiple traders in parallel 95
31
32 [Link](
33 "The trader(#{id}) is placing a BUY order " <>
34 "for #{symbol} @ #{price}, quantity: #{quantity}"
35 )
36
37 ...
38 end
39
40 def handle_info(
41 %TradeEvent{
42 buyer_order_id: order_id
43 },
44 %State{
45 id: id,
46 ...
47 } = state
48 ) do
49 ...
50 [Link](
51 "The trader(#{id}) is placing a SELL order for " <>
52 "#{symbol} @ #{sell_price}, quantity: #{quantity}."
53 )
54 ...
55 [Link]("Trader's(#{id} #{symbol} BUY order got partially filled")
56 ...
57 end
58
59 def handle_info(
60 %TradeEvent{
61 seller_order_id: order_id
62 },
63 %State{
64 id: id,
65 ...
66 } = state
67 ) do
68 ...
69 [Link]("Trader(#{id}) finished trade cycle for #{symbol}")
70 ...
71 [Link]("Trader's(#{id} #{symbol} SELL order got partially filled") .\
72 ..
73 end
Chapter 9 - Run multiple traders in parallel 96
74
75 def handle_info(
76 %TradeEvent{
77 price: current_price
78 },
79 %State{
80 id: id,
81 ...
82 } = state
83 ) do
84 ...
85 [Link]("Rebuy triggered for #{symbol} by the trader(#{id})")
86 ...
87 end
That finishes the implementation part - we should now be able to test the implementation and see
dynamically growing number of traders for our strategy based on price movement.
1 $ iex -S mix
2 ...
3 iex(1)> :[Link]()
4 ...
5 [Link].223 [info] Starting new supervision tree to trade on BTCUSDT
6 {:ok, #PID<0.338.0>}
7 [Link].055 [info] Initializing new trader(1610973417051) for BTCUSDT
8 iex(6)> Streamer.start_streaming("BTCUSDT")
9 {:ok, #PID<0.352.0>}
10 [Link].176 [info] The trader(1610973417051) is placing a BUY order for BTCUSDT @ \
11 37177.9, quantity: 5.37e-4
12 [Link].426 [info] The trader(1610973417051) is placing a SELL order for BTCUSDT @\
13 37207.59, quantity: 5.37e-4.
14 [Link].298 [info] Rebuy triggered for BTCUSDT by the trader(1610973417051)
15 [Link].299 [info] Starting new trader for BTCUSDT
16 [Link].299 [info] Initializing new trader(1610973431299) for BTCUSDT
17 [Link].305 [info] The trader(1610973431299) is placing a BUY order for BTCUSDT @ \
18 37136.28, quantity: 5.38e-4
Chapter 9 - Run multiple traders in parallel 97
We can clearly see that our strategy dynamically scaled from 1 to 5 parallel traders and they were
going through different trading steps without any problems - I think that’s really cool to see and it
wasn’t difficult to achieve in Elixir.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github³¹
³¹[Link]
Chapter 10 - Fine-tune trading
strategy per symbol
Objectives
• describe and design the required functionality
• add docker to project
• set up ecto inside the naive app
• create and migrate the DB
• seed symbols’ settings
• update the [Link] to fetch settings
1 # /apps/naive/lib/naive/[Link]
2 ...
3 defp fetch_symbol_settings(symbol) do
4 symbol_filters = fetch_symbol_filters(symbol)
5
6 [Link](
7 %{
8 symbol: symbol, # <=
9 chunks: 5, # <=
10 budget: [Link]("100"), # <=
11 buy_down_interval: [Link]("0.0001"), # <= all of those settings
12 # -0.12% for quick testing # <=
13 profit_interval: [Link]("-0.0012"), # <=
14 rebuy_interval: [Link]("0.001") # <=
15 },
16 symbol_filters
17 )
18 end
19 ...
Chapter 10 - Fine-tune trading strategy per symbol 100
The problem about those is that they are hardcoded and there’s no flexibility to define them per
symbol at the moment.
In this chapter, we will move them out from this file into the Postgres database.
1 # /[Link]
2 version: "3.2"
3 services:
4 db:
5 image: postgres:latest
6 restart: always
7 environment:
8 POSTGRES_PASSWORD: "postgres"
9 ports:
10 - 5432:5432
11 volumes:
12 - ../postgres-data:/var/lib/postgresql/data
If you are new to docker here’s the gist of what the above will do:
1 $ docker-compose up -d
2 Creating hedgehog_db_1 ... done
1 $ docker ps -a
2 CONTAINER ID IMAGE COMMAND CREATED STATUS \
3 PORTS NAMES
4 98558827b80b postgres:latest "docker-entrypoint.s…" 4 seconds ago Up 4 sec\
5 onds [Link]:5432->5432/tcp hedgehog_db_1
1 # /apps/naive/[Link]
2 defp deps do
3 [
4 {:binance, "~> 0.7.1"},
5 {:binance_mock, in_umbrella: true},
6 {:decimal, "~> 2.0"},
7 {:ecto_sql, "~> 3.0"}, # <= New line
8 {:ecto_enum, "~> 1.4"}, # <= New line
9 {:phoenix_pubsub, "~> 2.0"},
10 {:postgrex, ">= 0.0.0"}, # <= New line
11 {:streamer, in_umbrella: true}
12 ]
13 end
1 $ mix [Link]
We can now use the ecto generator to add an the ecto repository to the Naive application:
³²[Link]
³³[Link]
³⁴[Link]
Chapter 10 - Fine-tune trading strategy per symbol 102
1 $ cd apps/naive
2 $ mix [Link] -r [Link]
3 * creating lib/naive
4 * creating lib/naive/[Link]
5 * updating ../../config/[Link]
6 Don't forget to add your new repo to your supervision tree
7 (typically in lib/naive/[Link]):
8
9 {[Link], []}
10
11 And to add it to the list of Ecto repositories in your
12 configuration files (so Ecto tasks work as expected):
13
14 config :naive,
15 ecto_repos: [[Link]]
Back to the IDE, the generator updated our config/[Link] file with the default access details
to the database, we need to modify them to point to our Postgres docker instance as well as add a
list of ecto repositories for our naive app (as per instruction above):
1 # /config/[Link]
2 config :naive, # <= added line
3 ecto_repos: [[Link]], # <= added line
4 binance_client: BinanceMock # <= merged from existing config
5
6 config :naive, [Link],
7 database: "naive", # <= updated
8 username: "postgres", # <= updated
9 password: "postgres", # <= updated
10 hostname: "localhost"
Here we can use localhost as inside the [Link] file we defined port forwarding
from the container to the host(Postgres is available at localhost:5432). We also merged the existing
binance_client setting together with the new ecto_repos setting.
The last step to be able to communicate with the database using Ecto will be to add the [Link]
module(created by generator) to the children list of the [Link]:
Chapter 10 - Fine-tune trading strategy per symbol 103
1 # /apps/naive/lib/naive/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {[Link], []}, # <= added line
6 {
7 DynamicSupervisor,
8 strategy: :one_for_one, name: [Link]
9 }
10 ]
11 ...
We can now copy the current hardcoded settings from the [Link] module and use them as
a column list of our new settings table. All of the below alterations needs to be done inside the
change function of our migration file:
1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 ...
3 def change do
4 create table(:settings) do
5 add(:symbol, :text, null: false)
6 add(:chunks, :integer, null: false)
7 add(:budget, :decimal, null: false)
8 add(:buy_down_interval, :decimal, null: false)
9 add(:profit_interval, :decimal, null: false)
10 add(:rebuy_interval, :decimal, null: false)
11 end
12 end
Chapter 10 - Fine-tune trading strategy per symbol 104
At this moment we just copied the settings and converted them to columns using the add function.
We need now to take care of the id column. We need to pass primary_key: false to the create
table macro to stop it from creating the default integer-based id column. Instead of that we will
define the id column ourselves with :uuid type and pass a flag that will indicate that it’s the primary
key of the settings table:
1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 ...
3 create table(:settings, primary_key: false) do
4 add(:id, :uuid, primary_key: true)
5 ...
We will also add create and update timestamps that come as a bundle when using the timestamps()
function inside the create table macro:
1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 ...
3 create table(...) do
4 ...
5
6 timestamps() # <= both create and update timestamps
7 end
8 ...
We will add a unique index on the symbol column to avoid any possible duplicates:
1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 ...
3 create table(...) do
4 ...
5 end
6
7 create(unique_index(:settings, [:symbol]))
8 end
9 ...
We will now add the status field which will be an Enum. It will be defined inside a separate file
and alias‘ed from our migration, this way we will be able to use it from within the migration and
the inside the lib code. First, we will apply changes to our migration and then we will move on to
creating the Enum module.
Here’s the full implementation of migration for reference:
Chapter 10 - Fine-tune trading strategy per symbol 105
1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 def change do
8 TradingStatusEnum.create_type()
9
10 create table(:settings, primary_key: false) do
11 add(:id, :uuid, primary_key: true)
12 add(:symbol, :text, null: false)
13 add(:chunks, :integer, null: false)
14 add(:budget, :decimal, null: false)
15 add(:buy_down_interval, :decimal, null: false)
16 add(:profit_interval, :decimal, null: false)
17 add(:rebuy_interval, :decimal, null: false)
18 add(:status, [Link](), default: "off", null: false)
19
20 timestamps()
21 end
22
23 create(unique_index(:settings, [:symbol]))
24 end
25 end
That finishes our work on the migration file. We will now focus on TradingStatusEnum implemen-
tation. First, we need to create a schema directory inside the apps/naive/lib/naive directory and
file called trading_status_enum.ex and place below logic (defining the enum) in it:
1 # /apps/naive/lib/naive/schema/trading_status_enum.ex
2 import EctoEnum
3
4 defenum([Link], :trading_status, [:on, :off])
We used the defenum macro from the ecto_enum module to define our enum. It’s interesting to point
out that we didn’t need to define a new module as defenum macro takes care of that for us.
Let’s run the migration to create the table, unique index, and the enum:
Chapter 10 - Fine-tune trading strategy per symbol 106
1 $ mix [Link]
2 [Link].757 [info] == Running 20210202223209 [Link].\
3 change/0 forward
4 [Link].759 [info] execute "CREATE TYPE public.trading_status AS ENUM ('on', 'off'\
5 )"
6 [Link].760 [info] create table settings
7 [Link].820 [info] create index settings_symbol_index
8 [Link].829 [info] == Migrated 20210202223209 in 0.0s
We can now create a schema file for the settings table so inside the /apps/naive/lib/naive/schema
create a file called [Link]. We will start with a skeleton implementation of schema file together
with the copied list of columns from the migration and convert to ecto’s types using it’s docs³⁵:
1 # /apps/naive/lib/naive/schema/[Link]
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 @primary_key {:id, :binary_id, autogenerate: true}
8
9 schema "settings" do
10 field(:symbol, :string)
11 field(:chunks, :integer)
12 field(:budget, :decimal)
13 field(:buy_down_interval, :decimal)
14 field(:profit_interval, :decimal)
15 field(:rebuy_interval, :decimal)
16 field(:status, TradingStatusEnum)
17
18 timestamps()
19 end
20 end
Let’s start by adding those default values to the config file(we will merge them into the structure
defining binance_client and ecto_repos):
1 # config/[Link]
2 config :naive,
3 ecto_repos: [[Link]],
4 binance_client: BinanceMock,
5 trading: %{
6 defaults: %{
7 chunks: 5,
8 budget: 1000.0,
9 buy_down_interval: 0.0001,
10 profit_interval: -0.0012,
11 rebuy_interval: 0.001
12 }
13 }
Moving on to the seeding script, we need to create a new file called seed_settings.exs inside the
/apps/naive/lib/naive/priv/ directory. Let’s start by aliasing the required modules and requiring
the Logger:
1 # /apps/naive/priv/seed_settings.exs
2 require Logger
3
4 alias Decimal
5 alias [Link]
6 alias [Link]
1 # /apps/naive/priv/seed_settings.exs
2 ...
3 binance_client = Application.get_env(:naive, :binance_client)
Now, it’s time to fetch all the symbols(pairs) that Binance supports:
Chapter 10 - Fine-tune trading strategy per symbol 108
1 # /apps/naive/priv/seed_settings.exs
2 ...
3 [Link]("Fetching exchange info from Binance to create trading settings")
4
5 {:ok, %{symbols: symbols}} = binance_client.get_exchange_info()
Now we need to fetch default trading settings from the config file as well as the current timestamp:
1 # /apps/naive/priv/seed_settings.exs
2 ...
3 %{
4 chunks: chunks,
5 budget: budget,
6 buy_down_interval: buy_down_interval,
7 profit_interval: profit_interval,
8 rebuy_interval: rebuy_interval
9 } = Application.get_env(:naive, :trading).defaults
10
11 timestamp = NaiveDateTime.utc_now()
12 |> [Link](:second)
We will use the default settings for all rows to be able to insert data into the database. Normally we
wouldn’t need to set inserted_at and updated_at fields as Ecto would generate those values for us
when using [Link]/2 but we won’t be able to use this functionality as it takes a single record
at the time. We will be using Repo.insert_all/3 which is a bit more low-level function without
those nice features like filling timestamps(sadly). Just to be crystal clear - [Link]/2 takes at
least a couple of seconds(on my machine) for 1000+ symbols currently supported by Binance, on the
other hand Repo.insert_all/3, will insert all of them in a couple of hundred milliseconds.
As our structs will differ by only the symbol column we can first create a full struct that will serve
as a template:
1 # /apps/naive/priv/seed_settings.exs
2 ...
3 base_settings = %{
4 symbol: "",
5 chunks: chunks,
6 budget: Decimal.from_float(budget),
7 buy_down_interval: Decimal.from_float(buy_down_interval),
8 profit_interval: Decimal.from_float(profit_interval),
9 rebuy_interval: Decimal.from_float(rebuy_interval),
10 status: "off",
11 inserted_at: timestamp,
Chapter 10 - Fine-tune trading strategy per symbol 109
12 updated_at: timestamp
13 }
We will now map each of the retrieved symbols and inject them to the base_settings structs and
pushing all of those to the Repo.insert_all/3 function:
1 # /apps/naive/priv/seed_settings.exs
2 ...
3 [Link]("Inserting default settings for symbols")
4
5 maps = symbols
6 |> [Link](&(%{base_settings | symbol: &1["symbol"]}))
7
8 {count, nil} = Repo.insert_all(Settings, maps)
9
10 [Link]("Inserted settings for #{count} symbols")
1 # /apps/naive/lib/naive/[Link]
2 ...
3 alias [Link]
4 alias [Link]
5 ...
Now we need to modify the fetch_symbol_settings/1 to fetch settings from DB instead of the
hardcoded map. We will use Repo.get_by!/3 as we are unable to trade without settings. The
second trick used here is Map.from_struct/1 that is required here as otherwise result would become
the [Link] struct(this would cause problems further down the line as we are
iterating on the returned map and would get the protocol Enumerable not implemented for
%[Link] error):
Chapter 10 - Fine-tune trading strategy per symbol 110
1 # /apps/naive/lib/naive/[Link]
2 ...
3 defp fetch_symbol_settings(symbol) do
4 symbol_filters = fetch_symbol_filters(symbol)
5 settings = Repo.get_by!(Settings, symbol: symbol)
6
7 [Link](
8 symbol_filters,
9 settings |> Map.from_struct()
10 )
11 end
12 ...
We can now run the seeding script to fill our database with the default settings:
1 $ cd apps/naive
2 $ mix run priv/seed_settings.exs
3 [Link].341 [info] Fetching exchange info from Binance to create trading settings
4 [Link].571 [info] Inserting default settings for symbols
5 [Link].645 [info] Inserted settings for 1276 symbols
We can verify that records were indeed inserted into the database by connecting to it using the psql
application:
That confirms that there are 1276 settings inside the database that will allow us to continue trading
which we can check by running our app inside iex(from the main project’s directory):
1 $ iex -S mix
2 ...
3 iex(1)> Naive.start_trading("NEOUSDT")
4 [Link].936 [info] Starting new supervision tree to trade on NEOUSDT
5 {:ok, #PID<0.378.0>}
6 [Link].584 [info] Initializing new trader(1612293637000) for NEOUSDT
The above log messages confirm that the [Link] was able to fetch settings from the database
that were later put into the [Link]’s state and passed to it.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github³⁶
³⁶[Link]
Chapter 11 - Supervise and autostart
streaming
Objectives
• describe and design the required functionality
• register the [Link] processes with names
• set up ecto inside the streamer app
• create and migrate the db
• seed default settings
• implement the supervision tree and start streaming functionality
• implement the stop functionality
• implement the autostart streaming functionality
• test the implementation
• start streaming. This will require a new [Link] process started under the
[Link]. We will put logic responsible for starting that process inside
the [Link] module.
• stop streaming. To be able to stop the [Link] process streaming on a specific symbol,
we will need to know that process’ PID based only on symbol string(ie. “ETHUSDT”). To make
that possible, we will need to register every [Link] process with a name that we
will be able to “reverse-engineer” based only on symbol string for example: :"#{__MODULE__-
}-#{symbol}"
Chapter 11 - Supervise and autostart streaming 113
• autostart streaming. At the start of streaming on a symbol, we should persist that action as a
symbol’s streaming setting inside the database. We will need to generate a new [Link], con-
figure, create and migrate DB (just like in the last chapter for the naive app) to be able to retrieve
that list. We will write a logic that will fetch settings of the symbols, autostart the ones that are
enabled and place all that logic inside the [Link] module. We
will introduce a Task³⁷ child process that will utilize the logic from the [Link]
to fetch those enabled symbols and start [Link] processes on startup - we will
describe all of this separately in its section in this chapter.
1 # /apps/streamer/lib/streamer/[Link]
2 def start_link(symbol) do
3 lowercased_symbol = [Link](symbol) # <= separate variable
4
5 WebSockex.start_link(
6 "#{@stream_endpoint}#{lowercased_symbol}@trade", # <= lowercase symbol
7 __MODULE__,
8 nil,
9 name: :"#{__MODULE__}-#{symbol}" # <= uppercase symbol
10 )
11 end
• we are getting the uppercase symbol but inside the URL we need to use a lowercase symbol so
we will introduce a new separate variable to be used in the URL
• we are registering the process using the uppercase symbol so the name will remain consistent
with the naive application’s processes
• to register process we are sending keyword list as the 4th argument to custom start_link/4
function of WebSockex module (link to source³⁸ - again, no need to be afraid of reading the
source code in Elixir, that’s the beauty of it)
³⁷[Link]
³⁸[Link]
Chapter 11 - Supervise and autostart streaming 114
1 # /apps/streamer/[Link]
2 ...
3 defp deps do
4 [
5 {:binance, "~> 0.7.1"}, # <= used to retrieve symbols list(exchangeInfo)
6 {:ecto_sql, "~> 3.0"}, # <= added dependency
7 {:ecto_enum, "~> 1.4"}, # <= added dependency
8 {:jason, "~> 1.2"},
9 {:phoenix_pubsub, "~> 2.0"},
10 {:postgrex, ">= 0.0.0"}, # <= added dependency
11 {:websockex, "~> 0.4.2"}
12 ]
13 end
1 $ cd apps/streamer
2 $ mix [Link] -r [Link]
3 * creating lib/streamer
4 * creating lib/streamer/[Link]
5 * updating ../../config/[Link]
6 ...
1 # /config/[Link]
2 config :streamer, # <= added line
3 ecto_repos: [[Link]] # <= added line
4
5 config :streamer, [Link],
6 database: "streamer", # <= database updated
7 username: "postgres", # <= username updated
8 password: "postgres", # <= password updated
9 hostname: "localhost"
The last step will be to update the children list of the [Link] module:
1 # /apps/streamer/lib/streamer/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {[Link], []}, # <= repo added
6 {
7 [Link],
8 name: [Link], adapter_name: [Link].PG2
9 }
10 ]
11 ...
We can safely start just with id, symbol and status columns, where the last one will follow the same
enum idea from the previous chapter:
Chapter 11 - Supervise and autostart streaming 116
1 # /apps/streamer/priv/repo/migrations/20210203184805_create_settings.exs
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 def change do
8 StreamingStatusEnum.create_type()
9
10 create table(:settings, primary_key: false) do
11 add(:id, :uuid, primary_key: true)
12 add(:symbol, :text, null: false)
13 add(:status, [Link](), default: "off", null: false)
14
15 timestamps()
16 end
17
18 create(unique_index(:settings, [:symbol]))
19 end
20 end
That finishes our work on the migration file, we need to add the StreamingStatusEnum in the
same way as in the last chapter (create a schema directory inside the apps/streamer/lib/streamer
directory and anew file called streaming_status_enum.ex and place below logic (defining the enum)
in it:
1 # /apps/streamer/lib/streamer/schema/streaming_status_enum.ex
2 import EctoEnum
3
4 defenum([Link], :streaming_status, [:on, :off])
Let’s run the migration to create the table, unique index, and the enum:
1 $ mix [Link]
2 [Link].850 [info] == Running 20210203184805 [Link]\
3 [Link]/0 forward
4 [Link].850 [info] execute "CREATE TYPE public.streaming_status AS ENUM ('on', 'of\
5 f')"
6 [Link].851 [info] create table settings
7 [Link].912 [info] create index settings_symbol_index
8 [Link].932 [info] == Migrated 20210203184805 in 0.0s
We can now create a schema file for the settings table. Inside the /apps/streamer/lib/streamer/schema
directory create a file called [Link]:
Chapter 11 - Supervise and autostart streaming 117
1 # /apps/streamer/lib/streamer/schema/[Link]
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 @primary_key {:id, :binary_id, autogenerate: true}
8
9 schema "settings" do
10 field(:symbol, :string)
11 field(:status, StreamingStatusEnum)
12
13 timestamps()
14 end
15 end
We are now ready to query the table but first, we need to insert the default settings into the database.
1 # /apps/streamer/priv/seed_settings.exs
2 require Logger
3
4 alias Decimal
5 alias [Link]
6 alias [Link]
7
8 [Link]("Fetching exchange info from Binance to create streaming settings")
9
10 {:ok, %{symbols: symbols}} = Binance.get_exchange_info()
11
12 timestamp = NaiveDateTime.utc_now()
13 |> [Link](:second)
14
15 base_settings = %{
Chapter 11 - Supervise and autostart streaming 118
16 symbol: "",
17 status: "off",
18 inserted_at: timestamp,
19 updated_at: timestamp
20 }
21
22 [Link]("Inserting default settings for symbols")
23
24 maps = symbols
25 |> [Link](&(%{base_settings | symbol: &1["symbol"]}))
26
27 {count, nil} = Repo.insert_all(Settings, maps)
28
29 [Link]("Inserted settings for #{count} symbols")
1 $ cd apps/streamer
2 $ mix run priv/seed_settings.exs
3 [Link].675 [info] Fetching exchange info from Binance to create streaming settings
4 [Link].386 [info] Inserting default settings for symbols
5 [Link].448 [info] Inserted settings for 1277 symbols
1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 defmodule [Link] do
3 use DynamicSupervisor
4
5 def start_link(init_arg) do
6 DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
7 end
8
9 def init(_init_arg) do
10 [Link](strategy: :one_for_one)
³⁹[Link]
Chapter 11 - Supervise and autostart streaming 119
11 end
12 end
Next, we will add the start_streaming/1 function at the bottom of the [Link]
module:
1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 ...
3 def start_streaming(symbol) when is_binary(symbol) do
4 symbol = [Link](symbol)
5
6 case get_pid(symbol) do
7 nil ->
8 [Link]("Starting streaming on #{symbol}")
9 {:ok, _settings} = update_streaming_status(symbol, "on")
10 {:ok, _pid} = start_streamer(symbol)
11
12 pid ->
13 [Link]("Streaming on #{symbol} already started")
14 {:ok, _settings} = update_streaming_status(symbol, "on")
15 {:ok, pid}
16 end
17 end
To unpack above - we are checking is there a streamer process already running for the passed symbol
and based on the result of that check, we either start the new streaming process(and update the
symbol’s settings) or just update the symbol’s settings.
Inside the start_streaming/1 function, we are using 3 helper functions that we need to add at the
bottom of the file:
1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 defp get_pid(symbol) do
3 [Link](:"[Link]-#{symbol}")
4 end
5
6 defp update_streaming_status(symbol, status)
7 when is_binary(symbol) and is_binary(status) do
8 Repo.get_by(Settings, symbol: symbol)
9 |> [Link](%{status: status})
10 |> [Link]()
11 end
12
Chapter 11 - Supervise and autostart streaming 120
13 defp start_streamer(symbol) do
14 DynamicSupervisor.start_child(
15 [Link],
16 {[Link], symbol}
17 )
18 end
The above functions are quite self-explanatory, get_pid/1 is a convenience wrapper, update_-
streaming_status/2 will update the status field for the passed symbol, start_streamer/1 will
instruct the [Link] to start a new [Link] process with
symbol passed as the first argument.
The last step to get the above function to work(and future ones in this module) would be to add an
require, an import and a few aliases at the top of the module:
1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 require Logger
3
4 import [Link], only: [from: 2]
5
6 alias [Link]
7 alias [Link]
1 # /apps/streamer/lib/[Link]
2 ...
3 alias [Link]
4
5 defdelegate start_streaming(symbol), to: DynamicStreamerSupervisor
As we don’t need to put any logic inside the Streamer.start_streaming/1 function, we can just
delegate the call straight to the [Link] module.
The last step will be to append the [Link] to the children list of the
[Link]:
Chapter 11 - Supervise and autostart streaming 121
1 # /apps/streamer/lib/streamer/[Link]
2 def start(_type, _args) do
3 children = [
4 ...
5 {[Link], []}
6 ]
At this moment our supervision tree already works and all streamer processes are being monitored
by the [Link]:
1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 def stop_streaming(symbol) when is_binary(symbol) do
3 symbol = [Link](symbol)
4
5 case get_pid(symbol) do
6 nil ->
7 [Link]("Streaming on #{symbol} already stopped")
8 {:ok, _settings} = update_streaming_status(symbol, "off")
9
10 pid ->
11 [Link]("Stopping streaming on #{symbol}")
12
13 :ok =
Chapter 11 - Supervise and autostart streaming 122
14 DynamicSupervisor.terminate_child(
15 [Link],
16 pid
17 )
18
19 {:ok, _settings} = update_streaming_status(symbol, "off")
20 end
21 end
1 # /apps/streamer/lib/[Link]
2 ...
3 defdelegate stop_streaming(symbol), to: DynamicStreamerSupervisor
4 ...
In cases like those, a new level of supervision needs to be introduced that will have a different
supervision strategy for those “coupled” processes. We will rename the process name of the
[Link] (which is currently registered as [Link]) to [Link].
Then we will introduce the new [Link] module and register it under the same name.
We will attach both [Link] and Task to the [Link]
and assign it with the rest_for_one strategy which will restart the Task whenever [Link]
would die:
Updated supervision tree with additional superviser and renamed Application process
1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2
3 # add below after the `init/1` function
4 def autostart_streaming() do
5 fetch_symbols_to_stream()
6 |> [Link](&start_streaming/1)
7 end
8
9 # and this at the end of the module
10 defp fetch_symbols_to_stream() do
11 [Link](
12 from(s in Settings,
13 where: [Link] == "on",
14 select: [Link]
15 )
16 )
17 end
Chapter 11 - Supervise and autostart streaming 124
autostart_streaming/0 function fetches all symbols from the settings table with status == "on"
then it passes them one by one into the start_streaming/1 function using [Link]/2.
We can now focus on referring to the above autostarting logic inside the new supervisor that we will
create now. Let’s start by creating a new file called [Link] inside the /apps/streamer/lib/streamer/
directory and fill it with default Supervisor⁴¹ implementation:
1 # /apps/streamer/lib/streamer/[Link]
2 defmodule [Link] do # <= updated module name
3 use Supervisor
4
5 def start_link(init_arg) do
6 Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
7 end
8
9 def init(_init_arg) do
10 children = [
11
12 ]
13
14 [Link](children, strategy: :one_for_one)
15 end
16 end
1 # /apps/streamer/lib/streamer/[Link]
2 def init(_init_arg) do
3 ...
4 [Link](children, strategy: :rest_for_one) # <= strategy updated
5 end
The last step inside our new supervisor will be to add 2 children: [Link]
and Task (that will autostart the symbol streamers):
⁴¹[Link]
Chapter 11 - Supervise and autostart streaming 125
1 # /apps/streamer/lib/streamer/[Link]
2 def init(_init_arg) do
3 children = [
4 {[Link], []},
5 {Task,
6 fn ->
7 [Link].autostart_streaming()
8 end}
9 ]
10 ...
11 end
1 # /apps/streamer/lib/streamer/[Link]
2 ...
3 children = [
4 {[Link], []},
5 {
6 [Link],
7 name: [Link], adapter_name: [Link].PG2
8 },
9 {[Link], []} # <= updated supervisor
10 ]
11
12 opts = [strategy: :one_for_one, name: [Link]] # <= updated name
13 ...
1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("ethusdt")
4 [Link].809 [info] Starting streaming on ETHUSDT
5 {:ok, #PID<0.370.0>}
6 iex(2)> Streamer.start_streaming("neousdt")
7 [Link].288 [info] Starting streaming on NEOUSDT
8 {:ok, #PID<0.377.0>}
1 $ iex -S mix
2 ...
3 iex(1)>
4 [Link].920 [info] Starting streaming on ETHUSDT
5 [Link].306 [info] Starting streaming on NEOUSDT
We can also confirm that streamer processes are there by using :[Link]():
Updated supervision tree with two autostarted streamers and new supervisor
1 iex(5)> Streamer.stop_streaming("neousdt")
2 [Link].205 [info] Stopping streaming on NEOUSDT
3 {:ok,
4 %[Link]{
5 ...
6 }}
7 iex(6)> Streamer.stop_streaming("ethusdt")
8 [Link].553 [info] Stopping streaming on ETHUSDT
9 {:ok,
10 %[Link]{
11 ...
12 }}
Chapter 11 - Supervise and autostart streaming 127
Stop the iex session and start new one - streamers shouldn’t be autostarted any more.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁴²
⁴²[Link]
Chapter 12 - Start, stop, shutdown
and autostart trading
Objectives
• describe and design the required functionality
• (re-)implement the start trading functionality
• implement the stop trading functionality
• implement the autostart trading functionality
• implement the shutdown trading functionality
• test the implementation
• stop trading - as the [Link] processes are registered with names that can
be easily reverse engineered, we should be able to utilize the Process.where_is/1 function
to retrieve the PIDs and instruct the [Link] to terminate those
child processes. Again, we need to put that logic somewhere so we will implement the
[Link] as a full module using the DynamicSupervisor behavior.
• start_trading - as our [Link] will now be a module we will be able
to remove the start_trading/1 implementation from the Naive module and reimplement it
inside the [Link] module. It will follow the same pattern of checking
for PID, starting the [Link] process and flipping the status flag inside the
settings table’s row for that symbol.
• shutdown trading - sometimes abruptly stopping trading won’t be the best solution, it would
be better to allow the [Link] processes to finish their ongoing trades. To be able to do
that we will need to inform the [Link] process assigned to the symbol that the settings
for that symbol have been updated and that should cause the [Link] process to withhold
starting new [Link] processes and terminate the whole tree when the last trader will
finish.
Chapter 12 - Start, stop, shutdown and autostart trading 129
• autostart trading - this will be a very similar implementation to the one from the last chapter. It
will require introducing a new supervisor(we will follow the same naming convention: rename
[Link]’s registered process name to [Link], create a new supervisor
called [Link]) and utilize the Task process to execute the autostarting logic.
Final supervision tree after adding the autostarting Task and the [Link]
Chapter 12 - Start, stop, shutdown and autostart trading 130
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 defmodule [Link] do # <= module updated
3 use DynamicSupervisor
4
5 require Logger # <= Logger added
6
7 import [Link], only: [from: 2] # <= added for querying
8
9 alias [Link] # <= added for querying/updating
10 alias [Link] # <= added for querying/updating
11
12 def start_link(init_arg) do
13 DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
14 end
15
16 def init(_init_arg) do
17 [Link](strategy: :one_for_one)
18 end
19 end
The above code is a default implementation from the DynamicSupervisor docs⁴³ with some
additional imports, require and aliases as we will use them in this chapter.
Our start_trading/1 implementation is almost the same as one for the streamer application from
the last chapter:
⁴³[Link]
Chapter 12 - Start, stop, shutdown and autostart trading 131
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def start_trading(symbol) when is_binary(symbol) do
4 symbol = [Link](symbol)
5
6 case get_pid(symbol) do
7 nil ->
8 [Link]("Starting trading of #{symbol}")
9 {:ok, _settings} = update_trading_status(symbol, "on")
10 {:ok, _pid} = start_symbol_supervisor(symbol)
11
12 pid ->
13 [Link]("Trading on #{symbol} already started")
14 {:ok, _settings} = update_trading_status(symbol, "on")
15 {:ok, pid}
16 end
17 end
18 ...
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 defp get_pid(symbol) do
3 [Link](:"[Link]-#{symbol}")
4 end
5
6 defp update_trading_status(symbol, status)
7 when is_binary(symbol) and is_binary(status) do
8 Repo.get_by(Settings, symbol: symbol)
9 |> [Link](%{status: status})
10 |> [Link]()
11 end
12
13 defp start_symbol_supervisor(symbol) do
14 DynamicSupervisor.start_child(
15 [Link],
16 {[Link], symbol}
17 )
18 end
Both implementation and helper functions are almost the same as the ones inside the naive
application. It could be tempting to abstract some of the logic away but remember that we should
treat all applications in our umbrella project as standalone services that should not share any code
Chapter 12 - Start, stop, shutdown and autostart trading 132
if possible(we broke that rule for the TradeEvent struct from the streamer app but we could easily
just make a lib with that struct that would be shared between two applications). I would shy away
from sharing any business logic between applications in the umbrella project.
There are two additional places where we need to make updates to get our start_trading/1 to work
again:
1 # /apps/naive/lib/naive/[Link]
2 ...
3 children = [
4 {[Link], []},
5 {[Link], []} # <= replacement of DynamicSupervisor
6 ]
• we need to replace the start_trading/1 implementation inside the Naive module to defdelegate
macro(as we don’t have any logic to run there):
1 # /apps/naive/lib/[Link]
2 ...
3 alias [Link]
4
5 defdelegate start_trading(symbol), to: DynamicSymbolSupervisor
6 ...
At this moment we are again able to use the Naive.start_trading/1 function to start trading on a
symbol (behind the scenes it will use logic from the new [Link] module).
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def stop_trading(symbol) when is_binary(symbol) do
4 symbol = [Link](symbol)
5
6 case get_pid(symbol) do
7 nil ->
8 [Link]("Trading on #{symbol} already stopped")
9 {:ok, _settings} = update_trading_status(symbol, "off")
10
11 pid ->
12 [Link]("Stopping trading of #{symbol}")
13
14 :ok =
15 DynamicSupervisor.terminate_child(
16 [Link],
17 pid
18 )
19
20 {:ok, _settings} = update_trading_status(symbol, "off")
21 end
22 end
23 ...
The second change we need to make is to create a forwarding interface using defdelegate inside
the Naive module:
1 # /apps/naive/lib/[Link]
2 ...
3 defdelegate stop_trading(symbol), to: DynamicSymbolSupervisor
4 ...
That pretty much finishes the stop_trading/1 functionality. We are now able to start and stop(what
was previously not available) trading on a symbol.
1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use Supervisor
4
5 def start_link(init_arg) do
6 Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
7 end
8
9 def init(_init_arg) do
10 children = [
11 {[Link], []}, # <= added
12 {Task, # <= added
13 fn -> # <= added
14 [Link].autostart_trading() # <= added
15 end} # <= added
16 ]
17
18 [Link](children, strategy: :rest_for_one) # <= strategy updated
19 end
20 end
1 # /apps/naive/lib/naive/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {[Link], []},
6 {[Link], []} # <= replacement for DynamicSymbolSupervisor
7 ]
8
9 opts = [strategy: :one_for_one, name: [Link]] # <= name updated
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 # add the below function after the `init/1` function
4 def autostart_trading() do
5 fetch_symbols_to_trade()
6 |> [Link](&start_trading/1)
7 end
8
9 ...
10
11 # and this helper at the end of the module
12 defp fetch_symbols_to_trade() do
13 [Link](
14 from(s in Settings,
15 where: [Link] == "on",
16 select: [Link]
17 )
18 )
19 end
20 ...
Those are the same (excluding updated function names) as inside the streamer application. We are
fetching enabled symbols and starting new [Link] processes for each one.
At this moment we can already see our implementation in action:
At this moment we are able to test the current implementation inside the iex:
1 $ iex -S mix
2 ...
3 iex(1)> Naive.start_trading("ethusdt")
4 [Link].207 [info] Starting trading of ETHUSDT
5 [Link].261 [info] Starting new supervision tree to trade on ETHUSDT
6 {:ok, #PID<0.372.0>}
7 [Link].344 [info] Initializing new trader(1612647333342) for ETHUSDT
8 iex(3)> Naive.start_trading("neousdt")
9 [Link].128 [info] Starting trading of NEOUSDT
Chapter 12 - Start, stop, shutdown and autostart trading 136
1 $ iex -S mix
2 ...
3 [Link].894 [info] Starting trading of ETHUSDT
4 [Link].938 [info] Starting new supervision tree to trade on ETHUSDT
5 [Link].786 [info] Initializing new trader(1612647558784) for ETHUSDT
6 iex(1)>
The above logs confirm that the naive application autostarts the previously enabled symbols(using
the start_trading/1 function) as well as stop_trading/1 updates the status inside the database (so
the symbol isn’t autostarted at the next initialization).
1 # /apps/naive/lib/[Link]
2 ...
3 defdelegate shutdown_trading(symbol), to: DynamicSymbolSupervisor
4 ...
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def shutdown_trading(symbol) when is_binary(symbol) do
4 symbol = [Link](symbol)
5
6 case get_pid(symbol) do
7 nil ->
8 [Link]("Trading on #{symbol} already stopped")
9 {:ok, _settings} = update_trading_status(symbol, "off")
10
11 _pid ->
12 [Link]("Shutdown of trading on #{symbol} initialized")
13 {:ok, settings} = update_trading_status(symbol, "shutdown")
14 [Link](:settings_updated, settings)
15 {:ok, settings}
16 end
17 end
18 ...
The crucial part of the implementation above is the notify(:settings_updated, settings) where
we inform the [Link] process that it needs to update trading settings.
Currently, the [Link] module does not support updating the settings after startup - let’s add
a new interface function together with a callback function that will handle settings updating:
1 # /apps/naive/lib/naive/[Link]
2 ...
3 # add the below function as the last clause of the `notify` function
4 def notify(:settings_updated, settings) do
5 [Link](
6 :"#{__MODULE__}-#{[Link]}",
7 {:update_settings, settings}
8 )
9 end
10
11 # add the below handler function as the last clause of `handle_call` function
12 def handle_call(
13 {:update_settings, new_settings},
14 _,
15 state
16 ) do
17 {:reply, :ok, %{state | settings: new_settings}}
18 end
Chapter 12 - Start, stop, shutdown and autostart trading 138
Ok, we have a way to update the settings of the [Link] process “on the go” but what effects
should the shutdown state have on the [Link]’s actions?
There are two places that require modification:
• whenever the [Link] process will finish the trade cycle, [Link] process should
not start a new one, as well as check, was that the last trader process and if that was the
case it needs to call the Naive.stop_trading/1 function with its symbol to terminate whole
supervision tree for that symbol
• whenever the [Link] process will receive a rebuy notification, it should just ignore it
when the symbol is in the shutdown state.
1 # /apps/naive/lib/naive/[Link]
2 ...
3 def handle_info(
4 {:DOWN, _ref, :process, trader_pid, :normal},
5 %{traders: traders, symbol: symbol, settings: settings} = state
6 ) do
7 [Link]("#{symbol} trader finished trade - restarting")
8
9 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
10 nil ->
11 [Link](
12 "Tried to restart finished #{symbol} " <>
13 "trader that leader is not aware of"
14 )
15
16 if [Link] == "shutdown" and traders == [] do # <= additional check
17 Naive.stop_trading([Link])
18 end
19
20 {:noreply, state}
21
22 index ->
23 new_traders =
24 if [Link] == "shutdown" do # <= refactored code
25 [Link](
26 "The leader won't start a new trader on #{symbol} " <>
27 "as symbol is in the 'shutdown' state"
28 )
29
Chapter 12 - Start, stop, shutdown and autostart trading 139
30 if length(traders) == 1 do
31 Naive.stop_trading([Link])
32 end
33
34 List.delete_at(traders, index)
35 else
36 new_trader_data = start_new_trader(fresh_trader_state(settings))
37 List.replace_at(traders, index, new_trader_data)
38 end
39
40 {:noreply, %{state | traders: new_traders}}
41 end
42 end
43 ...
As visible in the above code, whenever the [Link] process will finish the trade cycle, the
[Link] process will check can it find a record of that trader in its state (no changes here).
We will modify the callback so the leader process will check the [Link]. In the shutdown
status it checks wheater it was the last trader in the traders list, to terminate the whole tree at that
time(using the Naive.stop_trading/1 function).
The second callback that we need to modify is the rebuy triggered:
1 # /apps/naive/lib/naive/[Link]
2 ...
3 def handle_call(
4 {:rebuy_triggered, new_trader_state},
5 {trader_pid, _},
6 %{traders: traders, symbol: symbol, settings: settings} = state
7 ) do
8 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
9 nil ->
10 [Link]("Rebuy triggered by trader that leader is not aware of")
11 {:reply, :ok, state}
12
13 index ->
14 traders =
15 if [Link] == length(traders) do
16 [Link]("All traders already started for #{symbol}")
17 traders
18 else
19 if [Link] == "shutdown" do # <= refactored code
20 [Link](
Chapter 12 - Start, stop, shutdown and autostart trading 140
In the above rebuy_triggered handler function we added branching on the [Link] and
we simply ignore the rebuy notification when the symbol is in the shutdown status.
The final change will be to create a new migration that will update the TradingStatusEnum to have
shutdown option:
1 $ cd apps/naive
2 $ mix [Link] update_trading_status
3 * creating priv/repo/migrations/20210205232303_update_trading_status.exs
Inside the generated migration file we need to excute a raw SQL command:
1 # /apps/naive/priv/repo/migrations/20210205232303_update_trading_status.exs
2 defmodule [Link] do
3 use [Link]
4
5 @disable_ddl_transaction true
6
7 def change do
8 [Link] "ALTER TYPE trading_status ADD VALUE IF NOT EXISTS 'shutd\
9 own'"
10 end
11 end
1 # /apps/naive/lib/naive/schema/trading_status_enum.ex
2 import EctoEnum
3
4 defenum([Link], :trading_status, [:on, :off, :shutdown])
1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("ethusdt")
4 [Link].651 [info] Starting streaming on ETHUSDT
5 {:ok, #PID<0.372.0>}
6 iex(2)> Naive.start_trading("ethusdt")
7 [Link].830 [info] Starting trading of ETHUSDT
8 [Link].867 [info] Starting new supervision tree to trade on ETHUSDT
9 {:ok, #PID<0.379.0>}
10 [Link].816 [info] Initializing new trader(1612648004814) for ETHUSDT
11 ...
12 [Link].448 [info] Rebuy triggered for ETHUSDT by the trader(1612648004814)
13 ...
14 [Link].900 [info] Rebuy triggered for ETHUSDT by the trader(1612648089409)
15 ...
16 [Link].927 [info] Rebuy triggered for ETHUSDT by the trader(1612648198900)
17 ...
18 [Link].202 [info] Rebuy triggered for ETHUSDT by the trader(1612648326215)
19 [Link].250 [info] Rebuy triggered for ETHUSDT by the trader(1612648325512)
20 [Link].250 [info] All traders already started for ETHUSDT
21
22 # at this moment we have 5 `[Link]` processes trading in parallel
23
24 iex(4)> Naive.shutdown_trading("ethusdt")
25 [Link].556 [info] Shutdown of trading on ETHUSDT initialized
26 {:ok,
27 %[Link]{
28 ...
29 }}
30 ...
31 [Link].855 [info] Trader(1612648407202) finished trade cycle for ETHUSDT
32 [Link].855 [info] ETHUSDT trader finished trade - restarting
33 [Link].855 [warn] The leader won't start a new trader on ETHUSDTas symbol is in s\
34 hutdown state
Chapter 12 - Start, stop, shutdown and autostart trading 142
As we can see from the logs above, our naive strategy grown from 1 to 5 [Link] processes
running in parallel, then we called the shutdown_trading/1 function. In the shutdown status,
the [Link] process ignored rebuy notifications and wasn’t starting any new [Link]
processes as the old ones were finishing. At the moment when the last [Link] process finished
the trade cycle, the [Link] called stop_trading/1 on “it’s” symbol, terminating the whole
supervision tree for that symbol.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁴⁴
⁴⁴[Link]
Chapter 13 - Abstract duplicated
supervision code
Objectives
• overview of requirements
• pseudo generalize [Link] module
• utilize pseudo generalized code inside the [Link]
• implement a truly generic [Link]
• use the [Link] module inside the streamer application
Overview of requirements
In the last few chapters, we went through adding and modifying the dynamic supervision tree
around the naive and streamer applications’ workers. Initially, we just copied the implementation
from the streamer application to the naive application (with a few minor tweaks like log messages).
That wasn’t the most sophisticated solution and we will address this copy-paste pattern in this
chapter.
We will write an “extension” of the DynamicSupervisor that allows to start, stop and autostart
workers. Just to keep things simple we will create a new application inside our umbrella project
where we will place the logic. This will save us from creating a new repo for time being.
Our new [Link] module will hold all the logic responsible for starting, stopping,
and autostarting worker processes. To limit the boilerplate inside the implementation modules (like
[Link] or [Link]) we will utilize the use
macro that will dynamically generate low-level wiring for us.
1 $ cd apps
2 $ mix new core
3 * creating [Link]
4 * creating .[Link]
5 * creating .gitignore
6 * creating [Link]
7 * creating lib
8 * creating lib/[Link]
9 * creating test
10 * creating test/test_helper.exs
11 * creating test/core_test.exs
12 ...
We can now create a new directory called core inside the apps/core/lib directory and a new file
called service_supervisor.ex inside it where we will put all abstracted starting/stopping/autostart-
ing logic.
Let’s start with an empty module:
1 # /apps/core/lib/core/service_supervisor.ex
2 defmodule [Link] do
3
4 end
The first step in our refactoring process will be to move(cut) all of the functions from the
[Link] (excluding the start_link/1, init/1 and shutdown_trading/1)
and put them inside the [Link] module which should look as follows:
1 # /apps/core/lib/core/service_supervisor.ex
2 defmodule [Link] do
3
4 def autostart_trading() do
5 ...
6 end
7
8 def start_trading(symbol) when is_binary(symbol) do
9 ...
10 end
11
12 def stop_trading(symbol) when is_binary(symbol) do
13 ...
14 end
15
Chapter 13 - Abstract duplicated supervision code 145
16 defp get_pid(symbol) do
17 ...
18 end
19
20 defp update_trading_status(symbol, status)
21 when is_binary(symbol) and is_binary(status) do
22 ...
23 end
24
25 defp start_symbol_supervisor(symbol) do
26 ...
27 end
28
29 defp fetch_symbols_to_trade() do
30 ...
31 end
32 end
All of the above code is trading related - we need to rename functions/logs to be more generic.
Starting with autostart_trading/0 we can rename it to autostart_workers/0:
1 # /apps/core/lib/core/service_supervisor.ex
2 ...
3 def autostart_workers() do # <= updated function name
4 fetch_symbols_to_start() # <= updated function name
5 |> [Link](&start_worker/1) # <= updated function name
6 end
7 ...
As we updated two functions inside the autostart_workers/0 we need to update their implementa-
tions.
The start_trading/1 will become start_worker/1, internally we will inline the start_symbol_-
supervisor/1 function(move it’s contents inside the start_worker/1 function and remove the
start_symbol_supervisor/1 function) as it’s used just once inside this module as well as update_-
trading_status/2 need to be renamed to update_status/2.
1 # /apps/core/lib/core/service_supervisor.ex
2 def start_worker(symbol) when is_binary(symbol) do # <= updated name
3 symbol = [Link](symbol)
4
5 case get_pid(symbol) do
6 nil ->
7 [Link]("Starting trading of #{symbol}")
8 {:ok, _settings} = update_status(symbol, "on") # <= updated name
9
10 {:ok, _pid} =
11 DynamicSupervisor.start_child(
12 [Link],
13 {[Link], symbol}
14 ) # ^^^^^^ inlined `start_symbol_supervisor/1`
15
16 pid ->
17 [Link]("Trading on #{symbol} already started")
18 {:ok, _settings} = update_status(symbol, "on") # <= updated name
19 {:ok, pid}
20 end
21 end
22
23 ...
24
25 defp fetch_symbols_to_start() do # <= updated name
26 ...
27 end
Inside the above code we updated the update_trading_status/2 call to update_status/2 so we need
to update the function header to match:
1 # /apps/core/lib/core/service_supervisor.ex
2 defp update_status(symbol, status) # <= updated name
3 when is_binary(symbol) and is_binary(status) do
4 ...
5 end
Last function to rename in this module will be the stop_trading/1 to stop_worker/1, we also need
to update calls to update_trading_status/2 to update_status/2 as it was renamed:
Chapter 13 - Abstract duplicated supervision code 147
1 # /apps/core/lib/core/service_supervisor.ex
2 def stop_worker(symbol) when is_binary(symbol) do # <= updated name
3 symbol = [Link](symbol)
4
5 case get_pid(symbol) do
6 nil ->
7 [Link]("Trading on #{symbol} already stopped")
8 {:ok, _settings} = update_status(symbol, "off") # <= updated name
9
10 pid ->
11 [Link]("Stopping trading of #{symbol}")
12
13 :ok =
14 DynamicSupervisor.terminate_child(
15 [Link],
16 pid
17 )
18
19 {:ok, _settings} = update_status(symbol, "off") # <= updated name
20 end
21 end
Before we will jump back to the naive’s application modules we need to add the core application
the dependencies of the naive application:
Chapter 13 - Abstract duplicated supervision code 148
1 # /apps/naive/[Link]
2 defp deps do
3 [
4 {:binance, "~> 0.7.1"},
5 {:binance_mock, in_umbrella: true},
6 {:core, in_umbrella: true}, # <= core dep added
7 ....
Let’s get back to the [Link] where we expect functions that we just cut
out to exist like start_trading/1 or stop_trading/1.
Let’s reimplement more generic versions of those functions as just simple calls to the [Link]
module:
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def autostart_workers() do
4 [Link].autostart_workers()
5 end
6
7 def start_worker(symbol) do
8 [Link].start_worker(symbol)
9 end
10
11 def stop_worker(symbol) do
12 [Link].stop_worker(symbol)
13 end
We also need to update the shutdown_trading/1 function as we removed all the private functions
that it relies on:
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 def shutdown_worker(symbol) when is_binary(symbol) do # <= updated name
3 symbol = [Link](symbol)
4
5 case [Link].get_pid(symbol) do # <= module added
6 nil ->
7 [Link]("Trading on #{symbol} already stopped")
8 {:ok, _settings} = [Link].update_status(symbol, "off") # <= \
9 updated name + module
10
11 _pid ->
12 [Link]("Shutdown of trading on #{symbol} initialized")
Chapter 13 - Abstract duplicated supervision code 149
As we were moving all the private helper functions we didn’t make them public so the [Link]
module can use them - we will fix that now together with temporary aliases/require/imports at the
top of the [Link]:
1 # /apps/core/lib/core/service_supervisor.ex
2 defmodule [Link] do
3
4 require Logger # <= added require
5
6 import [Link], only: [from: 2] # <= added import
7
8 alias [Link] # <= added alias
9 alias [Link] # <= added alias
10
11 ...
12
13 def get_pid(symbol) do # <= updated from private to public
14 ...
15 end
16
17 def update_status(symbol, status) # <= updated from private to public
18 when is_binary(symbol) and is_binary(status) do
19 ...
20 end
1 # /apps/core/[Link]
2 defp deps do
3 [
4 {:ecto_sql, "~> 3.0"}
5 ]
6 end
1 # /apps/naive/lib/naive/[Link]
2 ...
3 {Task,
4 fn ->
5 [Link].autostart_workers() # <= func name updated
6 end}
7 ...
1 # /apps/naive/lib/[Link]
2 alias [Link]
3
4 defdelegate start_trading(symbol), to: DynamicSymbolSupervisor, as: :start_worker
5 defdelegate stop_trading(symbol), to: DynamicSymbolSupervisor, as: :stop_worker
6 defdelegate shutdown_trading(symbol), to: DynamicSymbolSupervisor, as: :shutdown_w\
7 orker
Believe it or not, but at this moment(ignoring all of the warnings because we created a circular
dependency between the core and the naive applications - which we will fix in the next steps) our
application runs just fine! We are able to start and stop trading, autostarting works as well.
That’s all nice and to-some-extent understandable but [Link] module is not a
generic module. We can’t use it inside the streaming application to supervise the [Link]
processes.
So, what’s the point? Well, we can make it even more generic!
Chapter 13 - Abstract duplicated supervision code 151
1 # /apps/core/lib/core/service_supervisor.ex
2 def fetch_symbols_to_start() do
3 [Link](
4 from(s in Settings,
5 where: [Link] == "on",
6 select: [Link]
7 )
8 )
9 end
The fetch_symbols_to_start/0 function uses Repo and Settings that are aliased at the top of the
[Link] module. This just won’t work with any other applications as Streamer will
require its own Repo and Settings modules etc.
To fix that we will pass both repo and schema as arguments to the fetch_symbols_to_start/0
function which will become fetch_symbols_to_start/2:
1 # /apps/core/lib/core/service_supervisor.ex
2 def fetch_symbols_to_start(repo, schema) do # <= args added
3 [Link]( # <= lowercase `repo` is an argument not aliased module
4 from(s in schema, # <= settings schema module passed as arg
5 where: [Link] == "on",
6 select: [Link]
7 )
8 )
9 end
This will have a knock-on effect on any functions that are using fetch_symbols_to_start/0 - now
they need to use fetch_symbols_to_start/2 and pass appropriate Repo and Schema modules.
So, the fetch_symbols_to_start/0 is referenced by the autostart_workers/0 - we will need to
modify it to pass the repo and schema to the fetch_symbols_to_start/2 and as it’s inside the
[Link] module it needs to get them passed as arguments:
Chapter 13 - Abstract duplicated supervision code 152
1 # /apps/core/lib/core/service_supervisor.ex
2 def autostart_workers(repo, schema) do # <= args added
3 fetch_symbols_to_start(repo, schema) # <= args passed
4 |> [Link](&start_worker/1)
5 end
Going even further down the line, autostart_workers/0 is referenced by the autostart_workers/0
inside the [Link] module. As this module is (naive) application-specific,
it is a place where repo and schema are known from the context - for the naive application repo is
the [Link] module and schema is the [Link] module:
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def autostart_workers() do
4 [Link].autostart_workers(
5 [Link], # <= new value passed
6 [Link] # <= new value passed
7 )
8 end
This finishes the first of multiple paths that we need to follow to fully refactor the [Link]
module.
1 # /apps/core/lib/core/service_supervisor.ex
2 def update_status(symbol, status, repo, schema) # <= args added
3 when is_binary(symbol) and is_binary(status) do
4 repo.get_by(schema, symbol: symbol) # <= using dynamic repo and shcema modules
5 |> [Link](%{status: status})
6 |> [Link]() # <= using dynamic repo module
7 end
As previously we added repo and schema as arguments and modified the body of the function
to utilize them instead of hardcoded modules (aliased at the top of the [Link]
module).
In the same fashion as previously, we need to check “who” is using the update_status/2 and
update those calls to update_status/4. The function is used inside the start_worker/1 and the
stop_worker/1 inside the [Link] module so as previously we need to bubble them
up(pass via arguments to both start_worker/1 and stop_worker/1 functions):
Chapter 13 - Abstract duplicated supervision code 153
1 # /apps/core/lib/core/service_supervisor.ex
2 def start_worker(symbol, repo, schema) when is_binary(symbol) do # <= new args
3 ...
4 {:ok, _settings} = update_status(symbol, "on", repo, schema) # <= args passed
5 ...
6 {:ok, _settings} = update_status(symbol, "on", repo, schema) # <= args passed
7 ...
8 end
9
10 def stop_worker(symbol, repo, schema) when is_binary(symbol) do # <= new args
11 ...
12 {:ok, _settings} = update_status(symbol, "off", repo, schema) # <= args pass\
13 ed
14 ...
15 {:ok, _settings} = update_status(symbol, "off", repo, schema) # <= args pass\
16 ed
17 ...
18 end
1 # /apps/core/lib/core/service_supervisor.ex
2 def autostart_workers(repo, schema) do
3 fetch_symbols_to_start(repo, schema)
4 |> [Link](&start_worker(&1, repo, schema)) # <= args passed
5 end
Both the start_worker/3 and the stop_worker/3 function are used by the functions inside the
[Link] module. We need to pass the Repo and Schema in the same fashion
as previously with autostart_workers/2 function:
Chapter 13 - Abstract duplicated supervision code 154
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 def start_worker(symbol) do
3 [Link].start_worker(
4 symbol,
5 [Link], # <= new arg passed
6 [Link] # <= new arg passed
7 )
8 end
9
10 def stop_worker(symbol) do
11 [Link].stop_worker(
12 symbol,
13 [Link], # <= new arg passed
14 [Link] # <= new arg passed
15 )
16 end
At this moment there’s no code inside the [Link] module referencing the aliased
Repo nor Schema modules so we can safely remove both aliases - definitely, we are moving in the
right direction!
Btw. Our project still works at this stage, we can start/stop trading and it autostarts trading.
1 # /apps/core/lib/core/service_supervisor.ex
2 def get_pid(symbol) do
3 [Link](:"[Link]-#{symbol}")
4 end
We can see that it has a hardcoded [Link] worker module - we need to make this
part dynamic by using the worker_module argument:
1 # /apps/core/lib/core/service_supervisor.ex
2 def get_pid(worker_module, symbol) do # <= arg added
3 [Link](:"#{worker_module}-#{symbol}") # <= arg used
4 end
Moving up to functions that are referencing the get_pid/1 function, those will be the start_-
worker/3 and the stop_worker/3 function.
Chapter 13 - Abstract duplicated supervision code 155
As those are the two last functions to be updated, we will look into them more closely to finish our
refactoring in this 3rd run. At this moment both need to add worker_module as both are calling the
get_pid/2 function. Looking at both function we can see two other hardcoded details:
• inside log message there are words “trading” - we can replace them so we will utilize the
worker_module and symbol arguments
• there are two references to the [Link] which we will replace with
the module argument
• there is one more reference to the [Link] module which we will replace with
the worker_module argument
1 # /apps/core/lib/core/service_supervisor.ex
2 # module and worker_module args added vvvv
3 def start_worker(symbol, repo, schema, module, worker_module)
4 when is_binary(symbol) do
5 symbol = [Link](symbol)
6
7 case get_pid(worker_module, symbol) do # <= worker_module passed
8 nil ->
9 [Link]("Starting #{worker_module} worker for #{symbol}") # <= dynamic t\
10 ext
11 {:ok, _settings} = update_status(symbol, "on", repo, schema)
12 {:ok, _pid} = DynamicSupervisor.start_child(module, {worker_module, symbol})\
13 # <= args used
14
15 pid ->
16 [Link]("#{worker_module} worker for #{symbol} already started") # <= dy\
17 namic text
18 {:ok, _settings} = update_status(symbol, "on", repo, schema)
19 {:ok, pid}
20 end
21 end
22
23 # module and worker_module added as args vvvv
24 def stop_worker(symbol, repo, schema, module, worker_module)
25 when is_binary(symbol) do
26 symbol = [Link](symbol)
27
28 case get_pid(worker_module, symbol) do # <= worker_module passed
29 nil ->
Chapter 13 - Abstract duplicated supervision code 156
Again, as we modified start_worker/5 we need to make the last change inside the [Link]
module - autostart_workers/2 uses the start_worker/5 function:
1 # /apps/core/lib/core/service_supervisor.ex
2 def autostart_workers(repo, schema, module, worker_module) do # <= args added
3 fetch_symbols_to_start(repo, schema)
4 |> [Link](&start_worker(&1, repo, schema, module, worker_module)) # <= args ad\
5 ded
6 end
1 # /apps/core/lib/core/service_supervisor.ex
2 defmodule [Link] do
3 def autostart_workers(repo, schema, module, worker_module) do
4 ...
5 end
6
7 def start_worker(symbol, repo, schema, module, worker_module)
8 when is_binary(symbol) do
9 ...
10 end
11
12 def stop_worker(symbol, repo, schema, module, worker_module)
Chapter 13 - Abstract duplicated supervision code 157
13 when is_binary(symbol) do
14 ...
15 end
16
17 def get_pid(worker_module, symbol) do
18 ...
19 end
20
21 def update_status(symbol, status, repo, schema)
22 when is_binary(symbol) and is_binary(status) do
23 ...
24 end
25
26 def fetch_symbols_to_start(repo, schema) do
27 ...
28 end
29 end
That finishes the 3rd round of updates inside the [Link] module, now we need
to update the [Link] module to use updated functions(and pass required
arguments):
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def autostart_workers() do
4 [Link].autostart_workers(
5 [Link],
6 [Link],
7 __MODULE__, # <= added arg
8 [Link] # <= added arg
9 )
10 end
11
12 def start_worker(symbol) do
13 [Link].start_worker(
14 symbol,
15 [Link],
16 [Link],
17 __MODULE__, # <= added arg
18 [Link] # <= added arg
19 )
20 end
21
Chapter 13 - Abstract duplicated supervision code 158
22 def stop_worker(symbol) do
23 [Link].stop_worker(
24 symbol,
25 [Link],
26 [Link],
27 __MODULE__, # <= added arg
28 [Link] # <= added arg
29 )
30 end
31
32 def shutdown_worker(symbol) when is_binary(symbol) do
33 symbol = [Link](symbol)
34
35 case [Link].get_pid([Link], symbol) do # <= arg \
36 added
37 nil ->
38 [Link]("#{[Link]} worker for #{symbol} already stopped"\
39 ) # <= updated
40
41 {:ok, _settings} =
42 [Link].update_status(symbol, "off", [Link], [Link]\
43 [Link]) # <= args added
44
45 _pid ->
46 [Link]("Initializing shutdown of #{[Link]} worker for #\
47 {symbol}") # <= updated
48
49 {:ok, settings} =
50 [Link].update_status(
51 symbol,
52 "shutdown",
53 [Link],
54 [Link]
55 ) # ^^^ additional args passed
56
57 [Link](:settings_updated, settings)
58 {:ok, settings}
59 end
60 end
We needed to update referrences inside the shutdown_trading/1 function as well, as it calls get_-
pid/2 and update_status/4 functions.
We are now done with refactoring the [Link] module, it’s completely generic and
Chapter 13 - Abstract duplicated supervision code 159
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 defmodule [Link] do
3 use [Link]
To be able to use the [Link] module it needs to provide the __using__/1 macro.
As the simplest content of that macro, we can use here would be just to use the DynamicSupervisor
inside:
1 # /apps/core/lib/core/service_supervisor.ex
2 defmacro __using__(opts) do
3 [Link](opts)
4 quote location: :keep do
5 use DynamicSupervisor
6 end
7 end
How does this work? As an oversimplification, you can think about it as Elixir will look through
the contents of quote’s body(everything between quote ... do and end) in search for the unquote
function which can inject dynamic content. All of this will become much clearer as we will go
through the first example but the important part is that after executing any potential unquotes
inside the quote’s body, Elixir will grab that code as it would be just part of code and place it
inside the [Link] module at compile time(we will also see the result of
[Link]/1 at compilation).
At this moment(after swapping to use [Link]) our code still works and it’s exactly
as we would simply have use DynamicSupervisor inside the [Link]
module - as at compilation, it will be swapped to it either way(as per contents of the __using__/1
macro).
⁴⁵[Link]
Chapter 13 - Abstract duplicated supervision code 160
As the autostart_workers/0 function is a part of the boilerplate, we will move it from the
[Link] module to the [Link] module inside the __-
using__/1 macro.
Ok, but it has all of those other naive application-specific arguments - where will we get those?
That’s what that opts argument is for inside the __using__/1 macro. When we call use [Link]
we can pass an additional keyword list which will contain all naive application-specific details:
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 defmodule [Link] do
3 use [Link],
4 repo: [Link],
5 schema: [Link],
6 module: __MODULE__,
7 worker_module: [Link]
We can now update the __using__/1 macro to assign all of those details to variables(instead of using
[Link]/1):
1 # /apps/core/lib/core/service_supervisor.ex
2 defmacro __using__(opts) do
3 {:ok, repo} = [Link](opts, :repo)
4 {:ok, schema} = [Link](opts, :schema)
5 {:ok, module} = [Link](opts, :module)
6 {:ok, worker_module} = [Link](opts, :worker_module)
7 ...
At this moment we can use those dynamic values to generate code that will be specific to the imple-
mentation module for example autostart_workers/0 that we moved from the [Link]
module and will need to have different values passed to it(like [Link] as worker_module)
for the streamer application. We can see that it requires inserting those dynamic values inside the
autstart_workers/0 but how to dynamically inject arguments - unquote to the rescue. When we
will update the autostart_workers/0 function from:
Chapter 13 - Abstract duplicated supervision code 161
to:
in the end generated code that will be “pasted” to the [Link] module at
compile time will be:
This way we can dynamically create functions for any application(for the streamer application, it
will generate function call with the [Link], [Link] args etc).
We can apply that to all of the passed variables inside the autostart_workers/0 function - just for
reference full macro will look as follows:
Chapter 13 - Abstract duplicated supervision code 162
1 # /apps/core/lib/core/service_supervisor.ex
2 defmacro __using__(opts) do
3 {:ok, repo} = [Link](opts, :repo)
4 {:ok, schema} = [Link](opts, :schema)
5 {:ok, module} = [Link](opts, :module)
6 {:ok, worker_module} = [Link](opts, :worker_module)
7
8 quote location: :keep do
9 use DynamicSupervisor
10
11 def autostart_workers() do
12 [Link].autostart_workers(
13 unquote(repo),
14 unquote(schema),
15 unquote(module),
16 unquote(worker_module)
17 )
18 end
19 end
20 end
You can think about the above macro that it will substitute the unquote(..) parts with passed values
and then it will grab the whole contents between quote ... do and end and it will paste it to
the [Link] module at compile-time - we can visualize generated/”pasted”
code as:
This is exactly the code that we had before inside the [Link] module but
now it’s stored away inside the [Link]’s __using__/1 macro and it doesn’t need
to be implemented/copied across twice into two apps anymore.
Chapter 13 - Abstract duplicated supervision code 163
We can now follow the same principle and move start_worker/1 and stop_worker/1 from the
[Link] module into __using__/1 macro inside the [Link]
module:
1 # /apps/core/lib/core/service_supervisor.ex
2 # inside the __using__/1 macro
3 ...
4 def start_worker(symbol) do
5 [Link].start_worker(
6 symbol, # <= this needs to stay as variable
7 unquote(repo),
8 unquote(schema),
9 unquote(module),
10 unquote(worker_module)
11 )
12 end
13
14 def stop_worker(symbol) do
15 [Link].stop_worker(
16 symbol, # <= this needs to stay as variable
17 unquote(repo),
18 unquote(schema),
19 unquote(module),
20 unquote(worker_module)
21 )
22 end
Here we have an example of an execution time variable called symbol that we should not unquote as
it will be different per function call (source code should have symbol variable there not for example
"NEOUSDT").
1 # /apps/core/lib/core/service_supervisor.ex
2 # add below at the end of `quote` block inside `__using__/1`
3 defp get_pid(symbol) do
4 [Link].get_pid(
5 unquote(worker_module),
6 symbol
7 )
8 end
9
10 defp update_status(symbol, status) do
11 [Link].update_status(
12 symbol,
13 status,
14 unquote(repo),
15 unquote(schema)
16 )
17 end
As those will get compiled and “pasted” into the [Link] module we
can utilize them inside the shutdown_worker/1 function as they would be much simpler naive
application-specific local functions:
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 def shutdown_worker(symbol) when is_binary(symbol) do
3 symbol = [Link](symbol)
4
5 case get_pid(symbol) do # <= macro provided function
6 nil ->
7 [Link]("#{[Link]} worker for #{symbol} already stopped")
8 {:ok, _settings} = update_status(symbol, "off") # <= macro provided function
9
10 _pid ->
11 [Link]("Initializing shutdown of #{[Link]} worker for #\
12 {symbol}")
13 {:ok, settings} = update_status(symbol, "shutdown") # <= macro provided func\
14 tion
15 [Link](:settings_updated, settings)
16 {:ok, settings}
17 end
18 end
And now, a very last change - I promise ;) Both the start_link/1 and the init/1 functions are still
referencing the DynamicSupervisor module which could be a little bit confusing - let’s swap those
Chapter 13 - Abstract duplicated supervision code 165
calls to use the [Link] module (both to not confuse people and be consistent with
the use macro):
1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 def start_link(init_arg) do
3 [Link].start_link(__MODULE__, init_arg, name: __MODULE__)
4 end
5
6 def init(_init_arg) do
7 [Link](strategy: :one_for_one)
8 end
As we don’t want/need to do anything different inside the [Link] module than the
DynamicSupervisor is doing we can just delegate both of those inside the [Link]
module:
1 # /apps/core/lib/core/service_supervisor.ex
2 defdelegate start_link(module, args, opts), to: DynamicSupervisor
3 defdelegate init(opts), to: DynamicSupervisor
That finishes our refactoring of both the [Link] and the [Link]
modules.
We can test to confirm that everything works as expected:
1 $ iex -S mix
2 iex(1)> Naive.start_trading("NEOUSDT")
3 [Link].741 [info] Starting [Link] worker for NEOUSDT
4 [Link].768 [info] Starting new supervision tree to trade on NEOUSDT
5 {:ok, #PID<0.464.0>}
6 [Link].455 [info] Initializing new trader(1614462159452) for NEOUSDT
7 iex(2)> Naive.stop_trading("NEOUSDT")
8 [Link].362 [info] Stopping [Link] worker for NEOUSDT
9 {:ok,
10 %[Link]{
11 ...
12 }}
13 iex(3)> Naive.start_trading("BTCUSDT")
14 [Link].689 [info] Starting [Link] worker for BTCUSDT
15 [Link].723 [info] Starting new supervision tree to trade on BTCUSDT
16 {:ok, #PID<0.475.0>}
17 [Link].182 [info] Initializing new trader(1614462251182) for BTCUSDT
18 BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
Chapter 13 - Abstract duplicated supervision code 166
The above test confirms that we can start, stop, and shut down trading on a symbol as well as
autostarting of trading works.
1 # /apps/streamer/[Link]
2 defp deps do
3 [
4 {:binance, "~> 0.7.1"},
5 {:core, in_umbrella: true}, # <= core added to deps
6 ...
1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 defmodule [Link] do
3 use [Link],
4 repo: [Link],
5 schema: [Link],
6 module: __MODULE__,
7 worker_module: [Link]
8
9 def start_link(init_arg) do
10 [Link].start_link(__MODULE__, init_arg, name: __MODULE__)
11 end
12
13 def init(_init_arg) do
14 [Link](strategy: :one_for_one)
15 end
16 end
Not much to add here - we are useing the [Link] module and passing options to
it so it can macro generates streamer application-specific wrappers(like start_worker/1 or stop_-
worker/1 with required repo, schema etc) around generic logic from the [Link]
module.
Using the [Link] module will have an impact on the interface of the [Link]
as it will now provide functions like start_worker/1 instead of start_streaming/1 etc.
As with the naive application, we need to update the Task function inside the [Link]
module:
1 # /apps/streamer/lib/streamer/[Link]
2 ...
3 {Task,
4 fn ->
5 [Link].autostart_workers()
6 end}
7 ...
1 # /apps/streamer/lib/[Link]
2 alias [Link]
3
4 defdelegate start_streaming(symbol), to: DynamicStreamerSupervisor, as: :start_wor\
5 ker
6 defdelegate stop_streaming(symbol), to: DynamicStreamerSupervisor, as: :stop_worker
We can run a quick test to confirm that indeed everything works as expected:
1 $ iex -S mix
2 iex(1)> Streamer.start_streaming("NEOUSDT")
3 [Link].813 [info] Starting [Link] worker for NEOUSDT
4 {:ok, #PID<0.465.0>}
5 iex(2)> Streamer.stop_streaming("NEOUSDT")
6 [Link].212 [info] Stopping [Link] worker for NEOUSDT
7 {:ok,
8 %[Link]{
9 __meta__: #[Link]<:loaded, "settings">,
10 id: "db8c9429-2356-4243-a08f-0d0e89b74986",
11 inserted_at: ~N[2021-02-25 [Link],
12 status: "off",
13 symbol: "NEOUSDT",
14 updated_at: ~N[2021-02-27 [Link]
15 }}
16 iex(3)> Streamer.start_streaming("LTCUSDT")
17 [Link].361 [info] Starting [Link] worker for LTCUSDT
18 {:ok, #PID<0.490.0>}
19 BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
20 (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
21 ^C
22 $ iex -S mix
23 ...
24 [Link].775 [info] Starting [Link] worker for LTCUSDT
This finishes the implementation for both the streamer and the naive application. We are generating
dynamic functions(metaprogramming) using Elixir macros which is a cool exercise to go through
and feels like superpowers ;)
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁴⁶
⁴⁶[Link]
Chapter 14 - Store trade events and
orders inside the database
Objectives
• overview of requirements
• create a new data_warehouse application in the umbrella
• connect to the database using Ecto
• store trade events’ data
• store orders’ data
• implement supervision
Overview of requirements
In the next chapter, we will move on to testing our strategy against historical data(aka backtesting
- I will explain that process in the next chapter). What we need to have in place before we will be
able to do that is both trade events and orders stored in the database.
Starting with the trade events. The streamer application could store trade events from Binance inside
its database but how would that work if we would like to introduce another source of non-streamed
trade events(ie. flat files, HTTP polling). It would be better if the [Link] process would
keep on streaming those trade events as it is and we would create a new application that would
subscribe to the existing trade_events:#{symbol} topic and store them to the database.
A similar idea applies to the orders’ data. At this moment the naive application uses the Binance
module to place orders. We could store them inside the naive application’s database but how would
that work if we would like to introduce another trading strategy. Holding data in separate databases
for each strategy would cause further complications in future reporting, auditing, etc.
To store trade events’ and orders’ data we will create a new application called data_warehouse
inside our umbrella project. It will subscribe to a trade_events:#{symbol} stream as well as
orders:#{symbol} stream, convert broadcasted data to its own representations(structs), and store
it inside the database.
Trade events are already broadcasted to the PubSub topic, orders on the other hand aren’t. We
will need to modify the [Link] module to broadcast the new and updated orders to the
orders:#{symbol} topic.
Chapter 14 - Store trade events and orders inside the database 170
After implementing the basic worker that will store the incoming data(trade events and orders)
inside the database, we will look into adding a supervision tree utilizing Elixir Registry⁴⁷. It will
allow us to skip registering every worker with a unique atom and will offer an easy lookup to fetch
pids instead.
1 $ cd apps
2 $ mix new data_warehouse --sup
3 * creating [Link]
4 * creating .[Link]
5 * creating .gitignore
6 * creating [Link]
7 * creating lib
8 * creating lib/data_warehouse.ex
9 * creating lib/data_warehouse/[Link]
10 * creating test
11 * creating test/test_helper.exs
12 * creating test/data_warehouse_test.exs
13 ...
1 # /apps/data_warehouse/[Link]
2 ...
3 defp deps do
4 [
5 {:ecto_sql, "~> 3.0"},
6 {:ecto_enum, "~> 1.4"},
7 {:phoenix_pubsub, "~> 2.0"},
8 {:postgrex, ">= 0.0.0"},
9 {:streamer, in_umbrella: true}
10 ]
11 end
12 ...
⁴⁷[Link]
Chapter 14 - Store trade events and orders inside the database 171
Additionally, we added the phoenix_pubsub module to be able to subscribe to the PubSub topic and
the streamer application to be able to use its [Link] struct.
We can now jump back to the terminal to install added dependencies and generate a new [Link]
module:
1 $ mix [Link]
2 ...
3 $ cd apps/data_warehouse
4 $ mix [Link] -r [Link]
5 * creating lib/data_warehouse
6 * creating lib/data_warehouse/[Link]
7 * updating ../../config/[Link]
Before we will be able to create migrations that will create our tables we need to update the generated
configuration inside the config/[Link] file:
1 # /config/[Link]
2 ...
3 config :data_warehouse, # <= added line
4 ecto_repos: [[Link]] # <= added line
5
6 config :data_warehouse, [Link],
7 database: "data_warehouse", # <= updated line
8 username: "postgres", # <= updated line
9 password: "postgres", # <= updated line
10 hostname: "localhost"
11 ...
and add the [Link] module to the children list of the [Link]’s
process:
1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 children = [
4 {[Link], []}
5 ]
6 ...
The last step will be to create a database by running mix [Link] -r [Link]
command.
This ends up the setup of the Ecto - we can now move on to the implementation of storing the orders
and the trade events.
Chapter 14 - Store trade events and orders inside the database 172
1 $ cd apps/data_warehouse
2 $ mix [Link] create_trade_events
3 * creating priv/repo/migrations
4 * creating priv/repo/migrations/20210222224514_create_trade_events.exs
The [Link] struct will serve as a list of columns for our new trade_events
table. Here’s the full implementation of our migration:
1 # /apps/data_warehouse/priv/repo/migrations/20210222224514_create_trade_events.exs
2 defmodule [Link] do
3 use [Link]
4
5 def change do
6 create table(:trade_events, primary_key: false) do
7 add(:id, :uuid, primary_key: true)
8 add(:event_type, :text)
9 add(:event_time, :bigint)
10 add(:symbol, :text)
11 add(:trade_id, :integer)
12 add(:price, :text)
13 add(:quantity, :text)
14 add(:buyer_order_id, :bigint)
15 add(:seller_order_id, :bigint)
16 add(:trade_time, :bigint)
17 add(:buyer_market_maker, :bool)
18
19 timestamps()
20 end
21 end
22 end
We added the additional id field to easily identify each trade event and our timestamps for
monitoring.
Let’s run the migration so it will create a new trade_events table for us:
Chapter 14 - Store trade events and orders inside the database 173
1 $ mix [Link]
The next step will be to create a new directory called schema inside the apps/data_warehouse/lib/data_-
warehouse directory. Inside it, we need to create a new schema file called trade_event.ex. We can
copy across the same columns from the migration straight to schema:
1 # /apps/data_warehouse/lib/data_warehouse/schema/trade_event.ex
2 defmodule [Link] do
3 use [Link]
4
5 @primary_key {:id, :binary_id, autogenerate: true}
6
7 schema "trade_events" do
8 field(:event_type, :string)
9 field(:event_time, :integer)
10 field(:symbol, :string)
11 field(:trade_id, :integer)
12 field(:price, :string)
13 field(:quantity, :string)
14 field(:buyer_order_id, :integer)
15 field(:seller_order_id, :integer)
16 field(:trade_time, :integer)
17 field(:buyer_market_maker, :boolean)
18
19 timestamps()
20 end
21 end
At this moment we should be able to execute crud(create, read[select], update, delete) operations
over the table using the above struct.
Currently, we can already store the trade events’ data inside the database so we can move on to
collecting it. Trade events are getting broadcasted by the [Link] process here:
1 # /apps/streamer/lib/streamer/[Link]
2 ...
3 [Link](
4 [Link],
5 "trade_events:#{trade_event.symbol}",
6 trade_event
7 )
8 ...
Chapter 14 - Store trade events and orders inside the database 174
We will implement a subscriber process that will be given a PubSub topic and it will store incoming
data inside the database.
Let’s start by creating a new folder called subscriber inside the apps/data_warehouse/lib/data_-
warehouse directory together with a new file called [Link] inside it:
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 defmodule [Link] do
3 use GenServer
4
5 require Logger
6
7 defmodule State do
8 @enforce_keys [:topic]
9 defstruct [:topic]
10 end
11
12 def start_link(topic) do
13 GenServer.start_link(
14 __MODULE__,
15 topic,
16 name: :"#{__MODULE__}-#{topic}"
17 )
18 end
19
20 def init(topic) do
21 {:ok,
22 %State{
23 topic: topic
24 }}
25 end
26 end
At this moment it’s just a box standard implementation of the GenServer with a state struct
containing a single key(:topic). We need to update the init/1 function to subscribe to the PubSub
topic:
Chapter 14 - Store trade events and orders inside the database 175
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 def init(topic) do
3 [Link]("DataWarehouse worker is subscribing to #{topic}")
4
5 [Link](
6 [Link],
7 topic
8 )
9 ...
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 def handle_info(%[Link]{} = trade_event, state) do
3 opts =
4 trade_event
5 |> Map.from_struct()
6
7 struct!([Link], opts)
8 |> [Link]()
9
10 {:noreply, state}
11 end
As we did in the case of the [Link], all incoming messages trigger a handle_info/2 callback
with contents of the message and the current state of the subscriber worker. We just convert that
incoming trade event to a map and then that map to the TradeEvent struct that gets inserted into
the database.
This finishes storing of trade events implementation which we can test by in the interactive shell by
running:
1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("XRPUSDT")
4 [Link].147 [info] Starting [Link] worker for XRPUSDT
5 {:ok, #PID<0.395.0>}
6 iex(2)> [Link].start_link("trade_events:XRPUSDT")
7 [Link].204 [info] DataWarehouse worker is subscribing to trade_events:XRPUSDT
8 {:ok, #PID<0.405.0>}
As we can see in the above output, trade events are now getting stored inside the database.
1 $ cd apps/data_warehouse
2 $ mix [Link] create_orders
3 * creating priv/repo/migrations/20210222224522_create_orders.exs
The list of columns for this table will be a copy of [Link]⁴⁸ struct returned from the Binance
exchange:
⁴⁸[Link]
Chapter 14 - Store trade events and orders inside the database 177
1 # /apps/data_warehouse/priv/repo/migrations/20210222224522_create_orders.exs
2 defmodule [Link] do
3 use [Link]
4
5 def change do
6 create table(:orders, primary_key: false) do
7 add(:order_id, :bigint, primary_key: true)
8 add(:client_order_id, :text)
9 add(:symbol, :text)
10 add(:price, :text)
11 add(:original_quantity, :text)
12 add(:executed_quantity, :text)
13 add(:cummulative_quote_quantity, :text)
14 add(:status, :text)
15 add(:time_in_force, :text)
16 add(:type, :text)
17 add(:side, :text)
18 add(:stop_price, :text)
19 add(:iceberg_quantity, :text)
20 add(:time, :bigint)
21 add(:update_time, :bigint)
22
23 timestamps()
24 end
25 end
26 end
We updated all of the shortened names like orig_qty to full names like original_quantity.
Let’s run the migration so it will create a new orders table for us:
1 $ mix [Link]
We can copy the above fields list to create a schema module. First, let’s create a new file called
[Link] inside the apps/data_warehouse/lib/data_warehouse/schema directory:
Chapter 14 - Store trade events and orders inside the database 178
1 # /apps/data_warehouse/lib/data_warehouse/schema/[Link]
2 defmodule [Link] do
3 use [Link]
4
5 @primary_key {:order_id, :integer, autogenerate: false}
6
7 schema "orders" do
8 field(:client_order_id, :string)
9 field(:symbol, :string)
10 field(:price, :string)
11 field(:original_quantity, :string)
12 field(:executed_quantity, :string)
13 field(:cummulative_quote_quantity, :string)
14 field(:status, :string)
15 field(:time_in_force, :string)
16 field(:type, :string)
17 field(:side, :string)
18 field(:stop_price, :string)
19 field(:iceberg_quantity, :string)
20 field(:time, :integer)
21 field(:update_time, :integer)
22
23 timestamps()
24 end
25 end
We can now add a handler to our [Link] that will convert the [Link]
struct to [Link] and store data inside the database:
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 def handle_info(%[Link]{} = order, state) do
3 data =
4 order
5 |> Map.from_struct()
6
7 struct([Link], data)
8 |> [Link](%{
9 original_quantity: order.orig_qty,
10 executed_quantity: order.executed_qty,
11 cummulative_quote_quantity: order.cummulative_quote_qty,
12 iceberg_quantity: order.iceberg_qty
13 })
14 |> [Link](
Chapter 14 - Store trade events and orders inside the database 179
15 on_conflict: :replace_all,
16 conflict_target: :order_id
17 )
18
19 {:noreply, state}
20 end
21 ...
In the above code, we are copying the matching fields using the struct/2 function but all other
fields that aren’t 1 to 1 between two structs won’t be copied, so we need to merge them in the
second step(using the [Link]/2 function). We are also using the on_conflict: :replace_all
option to make the insert/2 function act as it would be upsert/2(to avoid writing separate logic
for inserting and updating the orders).
Having all of this in place we will now be able to store broadcasted orders’ data in the database but
there’s nothing actually broadcasting them.
We need to modify the [Link] module to broadcast the [Link] whenever it places
buy/sell orders or fetches them again:
1 # /apps/naive/lib/naive/[Link]
2 ...
3 # inside placing initial buy order callback
4 {:ok, %[Link]{} = order} =
5 @binance_client.order_limit_buy(symbol, quantity, price, "GTC")
6
7 :ok = broadcast_order(order)
8 ...
9
10 # inside buy order (partially) filled callback
11 {:ok, %[Link]{} = current_buy_order} =
12 @binance_client.get_order(
13 symbol,
14 timestamp,
15 order_id
16 )
17
18 :ok = broadcast_order(current_buy_order)
19 ...
20
21 # inside the same callback in case of buy order filled
22 {:ok, %[Link]{} = order} =
23 @binance_client.order_limit_sell(symbol, quantity, sell_price, "GTC")
24
Chapter 14 - Store trade events and orders inside the database 180
25 :ok = broadcast_order(order)
26 ...
27
28 # inside sell order (partially) filled callback
29 {:ok, %[Link]{} = current_sell_order} =
30 @binance_client.get_order(
31 symbol,
32 timestamp,
33 order_id
34 )
35
36 :ok = broadcast_order(current_sell_order)
37 ...
Above 4 places send both the [Link] and the [Link] structs - our broadcast_-
order/1 function needs to be able to handle them both. Add the following at the bottom of the
[Link] module:
1 # /apps/naive/lib/naive/[Link]
2 defp broadcast_order(%[Link]{} = response) do
3 response
4 |> convert_to_order()
5 |> broadcast_order()
6 end
7
8 defp broadcast_order(%[Link]{} = order) do
9 [Link](
10 [Link],
11 "orders:#{[Link]}",
12 order
13 )
14 end
15
16 defp convert_to_order(%[Link]{} = response) do
17 data =
18 response
19 |> Map.from_struct()
20
21 struct([Link], data)
22 |> [Link](%{
23 cummulative_quote_qty: "0.00000000",
24 stop_price: "0.00000000",
25 iceberg_qty: "0.00000000",
Chapter 14 - Store trade events and orders inside the database 181
26 is_working: true
27 })
28 end
The converting logic as previously uses the struct/2 function but it also merges in default values
that are missing from the much smaller [Link] struct(with comparison to the
[Link]).
At this moment we will be able to store orders inside the database and we can check that by running:
1 $ iex -S mix
2 ...
3 iex(1)> [Link].start_link("orders:NEOUSDT")
4 [Link].043 [info] DataWarehouse worker is subscribing to orders:XRPUSDT
5 {:ok, #PID<0.400.0>}
6 iex(2)> Naive.start_trading("NEOUSDT") [Link]\
7 39.741 [info] Starting [Link] worker for NEOUSDT
8 [Link].832 [info] Starting new supervision tree to trade on NEOUSDT
9 {:ok, #PID<0.402.0>}
10 [Link].654 [info] Initializing new trader(1614119921653) for NEOUSDT
11 iex(3)> Streamer.start_streaming("NEOUSDT") [Link]\
12 23.786 [info] Starting [Link] worker for NEOUSDT
13 {:ok, #PID<0.412.0>}
14 [Link].187 [info] The trader(1614119921653) is placing a BUY order for NEOUSDT @ \
15 37.549, quantity: 5.326
16 [Link].449 [info] The trader(1614119921653) is placing a SELL order for NEOUSDT @\
17 37.578, quantity: 5.326.
11 client_order_id | C81E728D9D4C2F636F067F89CC14862C
12 symbol | NEOUSDT
13 price | 38.16
14 original_quantity | 5.241
15 executed_quantity | 0.00000000
16 cummulative_quote_quantity | 0.00000000
17 status | FILLED
18 time_in_force | GTC
19 type | LIMIT
20 side | BUY
21 stop_price | 0.00000000
22 iceberg_quantity | 0.00000000
23 time | 1614120906320
24 update_time | 1614120906320
25 inserted_at | 2021-02-23 [Link]
26 updated_at | 2021-02-23 [Link]
27 -[ RECORD 2 ]--------------+---------------------------------
28 order_id | 2
29 client_order_id | ECCBC87E4B5CE2FE28308FD9F2A7BAF3
30 symbol | NEOUSDT
31 price | 38.19
32 original_quantity | 5.241
33 executed_quantity | 0.00000000
34 cummulative_quote_quantity | 0.00000000
35 status | NEW
36 time_in_force | GTC
37 type | LIMIT
38 side | SELL
39 stop_price | 0.00000000
40 iceberg_quantity | 0.00000000
41 time |
42 update_time |
43 inserted_at | 2021-02-23 [Link]
44 updated_at | 2021-02-23 [Link]
The first record above got inserted and updated as its state is “FILLED”, the second one wasn’t
updated yet as it’s still in “NEW” state - that confirms that the upsert trick works.
That finishes the implementation of storing orders inside the database.
Chapter 14 - Store trade events and orders inside the database 183
Implement supervision
Currently, we have a [Link] process that will take care of storing data
into the database, but sadly if anything will go wrong inside our worker and it will crash there’s no
supervision in place to restart it.
The supervision tree for the data_warehouse application will be similar to ones from the naive and
streamer apps but different enough to not use the [Link] abstraction.
For example, it doesn’t use the symbol column, it works based on the topic column. This
would require changes to the [Link]’s functions like update_status/4 or fetch_-
symbols_to_start/2, we could update them to accept column name but that would need to be passed
through other functions. We can see that this is probably not the best approach and the further we
will get the more complex it will become. The second issue would be that we are registering all
processes with names and that can be problematic as the list of processes will start to grow(as we
can imagine in the case of the data_warehouse application).
The better approach would be to mix the DynamicSupervisor⁴⁹ together with Registry⁵⁰.
The DynamicSupervisor will supervise the [Link] and instead of keeping track of them
by registering them using atoms we will start them :via Elixir Registry.
We will add all functionality that we implemented for naive and streamer applications. We will
provide the functions to start and stop storing data on passed PubSub topics as well as store those
topics inside the database so storing will be autostarted.
1 $ cd apps/data_warehouse
2 $ mix [Link] create_subscriber_settings
3 * creating priv/repo/migrations/20210227230123_create_subscriber_settings.exs
At this moment we can copy the code to create the settings table(enum and index as well) from
the streamer application and tweak it to fit the data_warehouse application. So the first important
change (beside updating namespaces from Streamer to DataWarehouse) will be to make a note that
we have a setting per topic - not per symbol as for the naive and streamer applications:
⁴⁹[Link]
⁵⁰[Link]
Chapter 14 - Store trade events and orders inside the database 184
1 # /apps/data_warehouse/priv/repo/migrations/20210227230123_create_subscriber_setting\
2 [Link]
3 defmodule [Link] do
4 use [Link]
5
6 alias [Link]
7
8 def change do
9 SubscriberStatusEnum.create_type()
10
11 create table(:subscriber_settings, primary_key: false) do
12 add(:id, :uuid, primary_key: true)
13 add(:topic, :text, null: false)
14 add(:status, [Link](), default: "off", null: false)
15
16 timestamps()
17 end
18
19 create(unique_index(:subscriber_settings, [:topic]))
20 end
21 end
Both schema and enum will be almost identical to the ones from the streamer application - we can
simply copy those files and apply basic tweaks like updating the namespace:
1 $ cp apps/streamer/lib/streamer/schema/[Link] apps/data_warehouse/lib/data_ware\
2 house/schema/subscriber_settings.ex
3 $ cp apps/streamer/lib/streamer/schema/streaming_status_enum.ex apps/data_warehouse/\
4 lib/data_warehouse/schema/subscriber_status_enum.ex
Remember about updating the symbol column to topic as well as table name inside the [Link]
1 # /apps/data_warehouse/lib/data_warehouse/schema/subscriber_settings.ex
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 @primary_key {:id, :binary_id, autogenerate: true}
8
9 schema "subscriber_settings" do
10 field(:topic, :string)
Chapter 14 - Store trade events and orders inside the database 185
11 field(:status, SubscriberStatusEnum)
12
13 timestamps()
14 end
15 end
1 # /apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.ex
2 import EctoEnum
3
4 defenum([Link], :subscriber_status, [:on, :off])
1 $ mix [Link]
At this moment we have all pieces in place to execute queries on our new table. In this place, we
can think about the seeding script. For the data_warehouse specifically, we won’t need to provide
that script as we don’t know in advance what topic names we will use. Instead of seeding settings
in advance, our code will “upsert”(using insert function) settings when start_storing/1 or stop_-
storing/1 are called.
Everything looks very similar to the supervision tree that we created in the streamer and the naive
applications but there’s an additional Registry that is supervised by the SubscriberSupervisior
process.
The idea is that inside the Worker module’s start_link/1 we will register worker processes using
:via⁵¹ tuple. Internally, GenServer will utilize Registry’s functions like register_name/2 to add
process to the registry under the topic string. This way we will be able to retrieve pids assigned
to topics using those topic strings instead of registering each worker process with an atom name.
Just as previously the DynamicSupervisor will be in charge of the supervising the Worker processes
and it won’t be even aware that we are using the Registry to keep track of topic => pid association.
⁵¹[Link]
Chapter 14 - Store trade events and orders inside the database 187
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 defmodule [Link] do
3 use DynamicSupervisor
4
5 def start_link(_arg) do
6 DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
7 end
8
9 def init(_arg) do
10 [Link](strategy: :one_for_one)
11 end
12 end
As we will put all our logic related to autostarting, starting and stopping inside this module we can
already add aliases, import and require:
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 require Logger
3
4 alias [Link]
5 alias [Link]
6 alias [Link]
7
8 import [Link], only: [from: 2]
9
10 @registry :subscriber_workers
Additionally, we added the @registry module attribute that we will use to retrieve pid for the specific
topic.
We can move on to implementing autostart_workers/0 which will look very similar to the ones
that we implemented in the streamer and the naive applications:
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 ...
3 def autostart_workers() do
4 [Link](
5 from(s in SubscriberSettings,
6 where: [Link] == "on",
7 select: [Link]
8 )
9 )
10 |> [Link](&start_child/1)
Chapter 14 - Store trade events and orders inside the database 188
11 end
12
13 defp start_child(args) do
14 DynamicSupervisor.start_child(
15 __MODULE__,
16 {Worker, args}
17 )
18 end
We can see that we are querying the database for list of topics(not symbols) and we are calling
start_child/2 for each results.
The start_worker/1 is where the Registry will shine as we won’t need to check is there already a
process running for that topic - we can leave that check to the Registry. If there’s process already
running for that topic it will just return a tuple starting with :error atom:
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 ...
3 def start_worker(topic) do
4 [Link]("Starting storing data from #{topic} topic")
5 update_status(topic, "on")
6 start_child(topic)
7 end
8 ...
9 defp update_status(topic, status)
10 when is_binary(topic) and is_binary(status) do
11 %SubscriberSettings{
12 topic: topic,
13 status: status
14 }
15 |> [Link](
16 on_conflict: :replace_all,
17 conflict_target: :topic
18 )
19 end
As we are not seeding the database with the default settings we will use the insert/2 function with
options(as previously) to make it work as it would be an “upsert” function.
Last function in this module will be stop_worker/1 which uses private stop_child/1 function. The
stop_child/1 function shows how to retrieve pid of the process assigned to the passed topic:
Chapter 14 - Store trade events and orders inside the database 189
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 ...
3 def stop_worker(topic) do
4 [Link]("Stopping storing data from #{topic} topic")
5 update_status(topic, "off")
6 stop_child(topic)
7 end
8 ...
9 defp stop_child(args) do
10 case [Link](@registry, args) do
11 [{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
12 _ -> [Link]("Unable to locate process assigned to #{inspect(args)}")
13 end
14 end
1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 ...
3 def start_link(topic) do
4 GenServer.start_link(
5 __MODULE__,
6 topic,
7 name: via_tuple(topic)
8 )
9 end
10 ...
11 defp via_tuple(topic) do
12 {:via, Registry, {:subscriber_workers, topic}}
13 end
14 ...
Passing the :name option to the GenServer’s start_link/3 function we instruct it to utilize the
Registry module to register processes under topic names.
Chapter 14 - Store trade events and orders inside the database 190
1 # /apps/data_warehouse/lib/data_warehouse/subscriber_supervisor.ex
2 defmodule [Link] do
3 use Supervisor
4
5 alias [Link]
6
7 @registry :subscriber_workers
8
9 def start_link(_args) do
10 Supervisor.start_link(__MODULE__, [], name: __MODULE__)
11 end
12
13 def init(_args) do
14 children = [
15 {Registry, [keys: :unique, name: @registry]},
16 {DynamicSupervisor, []},
17 {Task,
18 fn ->
19 DynamicSupervisor.autostart_workers()
20 end}
21 ]
22
23 [Link](children, strategy: :rest_for_one)
24 end
25 end
Important part here will be to match the Registry name to the one defined inside the DynamicSupervisor
and the Worker modules.
1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {[Link], []},
6 {[Link], []} # <= new module added
7 ]
8
9 # See [Link]
10 # for other strategies and supported options
11 opts = [strategy: :one_for_one, name: [Link]] # <= name updat\
12 ed
13 Supervisor.start_link(children, opts)
14 end
15 ...
Add interface
The final step will be to add an interface to the DataWarehouse application to start and stop storing:
1 # /apps/data_warehouse/lib/data_warehouse.ex
2 alias [Link]
3
4 def start_storing(stream, symbol) do
5 DynamicSupervisor.start_worker("#{[Link](stream)}:#{[Link](symbo\
6 l)}")
7 end
8
9 def stop_storing(stream, symbol) do
10 DynamicSupervisor.stop_worker("#{[Link](stream)}:#{[Link](symbol\
11 )}")
12 end
Inside the above functions, we are just doing a couple of sanity checks on the case of the passed
arguments assuming that topics follow the rule of downcased stream + : + upcased symbol.
Test
The interface above was the last step in our implementation, we can now test that all works as
expected:
Chapter 14 - Store trade events and orders inside the database 192
1 $ iex -S mix
2 ...
3 iex(1)> DataWarehouse.start_storing("trade_events", "NEOUSDT")
4 [Link].740 [info] Starting storing data from trade_events:NEOUSDT topic
5 [Link].847 [info] DataWarehouse worker is subscribing to trade_events:NEOUSDT
6 {:ok, #PID<0.429.0>}
7 iex(2)> DataWarehouse.start_storing("trade_events", "NEOUSDT")
8 [Link].753 [info] Starting storing data from trade_events:NEOUSDT topic
9 {:error, {:already_started, #PID<0.459.0>}}
10 iex(3)> DataWarehouse.start_storing("orders", "NEOUSDT")
11 [Link].386 [info] Starting storing data from orders:NEOUSDT topic
12 [Link].403 [info] DataWarehouse worker is subscribing to orders:NEOUSDT
13 {:ok, #PID<0.431.0>}
14 BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
15 (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
16 ^C%
17 $ iex -S mix
18 ...
19 [Link].058 [info] DataWarehouse worker is subscribing to trade_events:NEOUSDT
20 [Link].062 [info] DataWarehouse worker is subscribing to orders:NEOUSDT
21 # autostart works ^^^
22 iex(1)> Naive.start_trading("NEOUSDT")
23 [Link].316 [info] Starting [Link] worker for NEOUSDT
24 [Link].417 [info] Starting new supervision tree to trade on NEOUSDT
25 {:ok, #PID<0.419.0>}
26 iex(3)>
27 [Link].484 [info] Initializing new trader(1615221407466) for NEOUSDT
28 iex(2)> Streamer.start_streaming("NEOUSDT")
29 [Link].660 [info] Starting [Link] worker for NEOUSDT
30 {:ok, #PID<0.428.0>}
31 ...
32 iex(3)> DataWarehouse.stop_storing("trade_events", "NEOUSDT")
33 [Link].398 [info] Stopping storing data from trade_events:NEOUSDT topic
34 :ok
35 iex(4)> DataWarehouse.stop_storing("trade_events", "NEOUSDT")
36 [Link].151 [info] Stopping storing data from trade_events:NEOUSDT topic
37 [Link].160 [warn] Unable to locate process assigned to "trade_events:NEOUSDT"
38 :ok
39 iex(5)> [{pid, nil}] = [Link](:subscriber_workers, "orders:NEOUSDT")
40 [{#PID<0.417.0>, nil}]
41 iex(6)> [Link](pid, :crash)
42 true
43 [Link].812 [info] DataWarehouse worker is subscribing to orders:NEOUSDT
Chapter 14 - Store trade events and orders inside the database 193
As we can see even this simple implementation handles starting, autostarting and stopping. It also
gracefully handles starting workers when one is already running as well as stopping when there
none running.
As a challenge, you could update the naive and the streamer application to use Registry and remove
[Link] module as it was superseded by the above solution.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁵²
⁵²[Link]
Chapter 15 - Backtest trading strategy
Objectives
• overview of requirements
• implement the storing task
• test the backtesting
Overview of requirements
In the last chapter, we started storing trade events and orders in the database which will be crucial
for backtesting, which we will focus on in this chapter.
Backtesting is a procedure of running historical data through the system and observing how our
strategy would perform as if we would run it “in the past”. Backtesting works on assumption that
the market will behave in a similar fashion in the future as it was in the past.
At this moment we are receiving the trade events from the Binance through WebSocket. The
[Link] process is handling those messages by parsing them from JSON string to map,
then converting them to structs and broadcasting them to the trade_events:#{symbol} PubSub topic.
The [Link] subscribes to the trade_events:#{symbol} topic and takes decisions based on
incoming data. As it places buy and sell orders it broadcasts them to the orders:#{symbol} PubSub
topic. The [Link] processes subscribe to both trade events and orders
topics and store incoming data inside the database - we can visualize that flow like that:
Chapter 15 - Backtest trading strategy 195
To backtest we can substitute the [Link] process with a Task that will stream trade
events’ data from the database and broadcasts it to the trade_events:#{symbol} PubSub topic(the
same topic as the [Link] process).
From the perspective of the [Link] it does not make any difference who is broadcasting
those trade events. This should be a clear indication of the value of publish/subscribe model that we
Chapter 15 - Backtest trading strategy 196
implemented from the beginning. It allows us to swap producer and consumers freely to backtest
our trading strategies:
Task feeds the stream of data from the database straight to the PubSub topic
Chapter 15 - Backtest trading strategy 197
1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 defmodule [Link] do
3 use Task
4
5 def start_link(arg) do
6 Task.start_link(__MODULE__, :run, [arg])
7 end
8
9 def run(arg) do
10 # ...
11 end
12 end
To be able to query the database we will import Ecto and require Logger for logging:
1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 import [Link], only: [from: 2]
4
5 require Logger
6 ...
We can now modify the run/1 function to expect specific type, symbol, from, to and interval:
1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 def run(%{
4 type: :trade_events,
5 symbol: symbol,
6 from: from,
7 to: to,
8 interval: interval
9 }) do
10 ...
Inside the body of the run/1 function, the first we will convert from and to Unix timestamps by
using private helper functions:
Chapter 15 - Backtest trading strategy 198
1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 def run(%{
4 ...
5 }) do
6 from_ts =
7 "#{from}T[Link].000Z"
8 |> convert_to_ms()
9
10 to_ts =
11 "#{to}T[Link].000Z"
12 |> convert_to_ms()
13 end
14 ...
15 defp convert_to_ms(iso8601DateString) do
16 iso8601DateString
17 |> NaiveDateTime.from_iso8601!()
18 |> DateTime.from_naive!("Etc/UTC")
19 |> DateTime.to_unix()
20 |> Kernel.*(1000)
21 end
Next, we will select data from the database but because of possibly hundreds of thousands of rows
being selected and because we are broadcasting them to the PubSub every x ms it could take a
substantial amount of time to broadcast all of them. Instead of selecting data and storing all of it
in the memory, we will use [Link]/1 function to keep broadcasting it on the go. Additionally,
we will add index to the data to be able to log info messages every 10k messages. The last thing that
we need to define will be the timeout value - the default value is 5 seconds and we will change it to
:infinity:
1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 def run(%{
3 ...
4 }) do
5 ...
6
7 [Link](
8 fn ->
9 from(te in [Link],
10 where:
11 [Link] == ^symbol and
12 te.trade_time >= ^from_ts and
13 te.trade_time < ^to_ts,
Chapter 15 - Backtest trading strategy 199
14 order_by: te.trade_time
15 )
16 |> [Link]()
17 |> Enum.with_index()
18 |> [Link](fn {row, index} ->
19 :[Link](interval)
20
21 if rem(index, 10_000) == 0 do
22 [Link]("Publisher broadcasted #{index} events")
23 end
24
25 publishTradeEvent(row)
26 end)
27 end,
28 timeout: :infinity
29 )
30
31 [Link]("Publisher finished streaming trade events")
32 end
Finally, the above code uses the publishTradeEvent/1 helper function which converts DataWare-
house’s TradeEvent to the Stremer’s TradeEvent to broadcast the same structs as the streamer
application:
1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 defp publishTradeEvent(%[Link]{} = trade_event) do
4 new_trade_event =
5 struct(
6 [Link],
7 trade_event |> Map.to_list()
8 )
9
10 [Link](
11 [Link],
12 "trade_events:#{trade_event.symbol}",
13 new_trade_event
14 )
15 end
This finishes our implementation - we should be able to stream trade events from the database to
the PubSub using the above Task which we will do below.
Chapter 15 - Backtest trading strategy 200
1 $ cd /tmp
2 $ wget [Link]
3 [Link]
We can now uncompress the archive and load those trade events into our database:
1 $ gunzip [Link]
2 $ PGPASSWORD=postgres psql -Upostgres -h localhost -ddata_warehouse -c "\COPY trade\
3 _events FROM '/tmp/[Link]' WITH (FORMAT csv, delimiter ';');"
4 COPY 206115
The number after the word COPY in the response indicates the number of rows that got copied into
the database.
We can now give it a try and run full backtesting but first let’s clean the orders table:
We can now start a new iex session where we will start trading(the naive application) as well as
storing orders(the data_warehouse application) and instead of starting the [Link] worker
we will start the [Link] task with arguments matching the imported day and
symbol:
Chapter 15 - Backtest trading strategy 201
1 $ iex -S mix
2 ...
3 iex(1)> DataWarehouse.start_storing("orders", "XRPUSDT")
4 [Link].596 [info] Starting storing data from orders:XRPUSDT topic
5 [Link].632 [info] DataWarehouse worker is subscribing to orders:XRPUSDT
6 {:ok, #PID<0.417.0>}
7 iex(2)> Naive.start_trading("XRPUSDT")
8 [Link].293 [info] Starting [Link] worker for XRPUSDT
9 [Link].332 [info] Starting new supervision tree to trade on XRPUSDT
10 {:ok, #PID<0.419.0>}
11 [Link].327 [info] Initializing new trader(1615288698325) for XRPUSDT
12 iex(3)> [Link].start_link(%{type: :trade_events, symbol: "XRPUSDT"\
13 , from: "2019-06-02", to: "2019-06-04", interval: 5})
14 {:ok, #PID<0.428.0>}
15 [Link].532 [info] Publisher broadcasted 0 events
16 [Link].534 [info] The trader(1615288698325) is placing a BUY order for XRPUSDT @ \
17 0.44391, quantity: 450.5
18 [Link].749 [info] The trader(1615288698325) is placing a SELL order for XRPUSDT @\
19 0.44426, quantity: 450.5.
20 ...
21 [Link].568 [info] Publisher broadcasted 10000 events
22 ...
23 [Link].571 [info] Publisher broadcasted 20000 events
24 [Link].576 [info] Publisher broadcasted 30000 events
25 ...
26 [Link].875 [info] Publisher broadcasted 200000 events
27 [Link].576 [info] Publisher finished streaming trade events
From the above log, we can see that it took about 20 minutes to run 206k records through the system(a
lot of that time[17+ minutes] was indeed the 5ms sleep).
After the streaming finished we can check out the orders table inside the database to figure out how
many trades we made and what income have they generated.
By looking at the orders we can figure out some performance metrics but that’s less than perfect to
get answers to simple questions like “what’s the performance of my strategy?”. We will address that
and other concerns in future chapters.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁵³
⁵³[Link]