0% found this document useful (0 votes)
52 views208 pages

Create A Cryptocurrency Trading Bot in Elixir

This book provides a comprehensive guide to creating a cryptocurrency trading bot using Elixir. It covers various topics including connecting to Binance's WebSocket, implementing trading strategies, and enabling parallel trading. The content is structured into chapters that progressively build on the bot's functionality and performance enhancements.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
52 views208 pages

Create A Cryptocurrency Trading Bot in Elixir

This book provides a comprehensive guide to creating a cryptocurrency trading bot using Elixir. It covers various topics including connecting to Binance's WebSocket, implementing trading strategies, and enabling parallel trading. The content is structured into chapters that progressively build on the bot's functionality and performance enhancements.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd

Create a cryptocurrency trading bot in Elixir

Kamil Skowron
This book is for sale at [Link]

This version was published on 2021-03-09

This is a Leanpub book. Leanpub empowers authors and publishers with the Lean Publishing
process. Lean Publishing is the act of publishing an in-progress ebook using lightweight tools and
many iterations to get reader feedback, pivot until you have the right book and build traction once
you do.

© 2020 - 2021 Kamil Skowron


To my wife Sandra for putting up with my crazy ideas
Contents

Preface . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1

Who this book is for . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 2

What this book covers . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 3

Source code . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 7

Chapter 1 - Stream live crypto prices from Binance WSS . . . . . . . . . . . . . . . . . . . . . 8


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8
Create an umbrella app . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8
Create a supervised application inside an umbrella . . . . . . . . . . . . . . . . . . . . . . . . 8
Connect to Binance’s WebSocket Stream using the WebSockex module . . . . . . . . . . . . 8
Decode incoming events using the Jason module . . . . . . . . . . . . . . . . . . . . . . . . . 12

Chapter 2 - Create a naive trading strategy - a single trader process without supervision 18
Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 18
Initializiation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 18
How trading strategy will work? . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 22

Chapter 3 - Introduce PubSub as a communication method . . . . . . . . . . . . . . . . . . . . 30


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 30
Design . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 30
Implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 33

Chapter 4 - Mock the Binance API . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 36


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 36
Design . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 36
Create “BinanceMock” app . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 39
Implement getting exchange info . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 40
Implement placing buy and sell orders . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 40
Implement order retrival . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 45
Implement callback for incoming trade events . . . . . . . . . . . . . . . . . . . . . . . . . . . 46
Upgrade trader and config . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 49
Test the implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51
CONTENTS

Chapter 5 - Enable parallel trading on multiple symbols . . . . . . . . . . . . . . . . . . . . . . 52


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 52
Introduction - architectural design . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 52
Implement [Link] . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 57
Implement [Link] . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 58

Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable . . . . 68


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 68
Why we need to buy below the current price? Feature overview . . . . . . . . . . . . . . . . 69
[Link] implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 70
[Link] implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 72

Chapter 7 - Introduce a trader budget and calculating the quantity . . . . . . . . . . . . . . . 74


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 74
Fetch step_size . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 74
Append budget and step_size to the Trader’s state inside the Leader . . . . . . . . . . . . 75
Append budget and step_size to the Trader’s state . . . . . . . . . . . . . . . . . . . . . . . . 76
Calculate quantity . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 77

Chapter 8 - Add support for multiple transactions per order . . . . . . . . . . . . . . . . . . . 79


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 79
Issue with the current implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 79
Improve buy order filled callback . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 81
Implement buy order “filled” callback . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 83
Improve sell order callback . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 83
Test the implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 85

Chapter 9 - Run multiple traders in parallel . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 86


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 86
Describe and design the required functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . 86
Implement rebuy inside [Link] . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 88
Implement rebuy in the [Link] . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 91
Improve logs by assigning ids to traders . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 93
Test the implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 96

Chapter 10 - Fine-tune trading strategy per symbol . . . . . . . . . . . . . . . . . . . . . . . . . 99


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 99
Describe and design the required functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . 99
Add docker to project . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 100
Set up ecto inside the naive app . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 101
Create and migrate the DB . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 103
Seed symbols’ settings . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 106
Update the [Link] to fetch settings . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 109
CONTENTS

Chapter 11 - Supervise and autostart streaming . . . . . . . . . . . . . . . . . . . . . . . . . . . 112


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 112
Describe and design the required functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . 112
Register the [Link] processes with names . . . . . . . . . . . . . . . . . . . . . . . 113
Set up ecto inside the streamer app . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 114
Create and migrate the db . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 115
Seed default settings . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 117
Implement the supervision tree and start streaming functionality . . . . . . . . . . . . . . . 118
Implement the stop functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 121
Implement the autostart streaming functionality . . . . . . . . . . . . . . . . . . . . . . . . . . 122
Test the implementation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 125

Chapter 12 - Start, stop, shutdown and autostart trading . . . . . . . . . . . . . . . . . . . . . 128


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 128
Describe and design the required functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . 128
(Re-)Implement the start trading functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . 130
Implement the stop trading functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 132
Implement the autostart trading functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . 133
Implement the shutdown trading functionality . . . . . . . . . . . . . . . . . . . . . . . . . . . 136

Chapter 13 - Abstract duplicated supervision code . . . . . . . . . . . . . . . . . . . . . . . . . 143


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 143
Overview of requirements . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 143
Psuedo generalize [Link] module . . . . . . . . . . . . . . . . . . . . . . . . . 143
Utilize pseudo generalized code inside the [Link] . . . . . . . . 147
Implement a truly generic [Link] . . . . . . . . . . . . . . . . . . . . . . . 150
Remove boilerplate using use macro . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 159
Use the [Link] module inside the streamer application . . . . . . . . . . 166

Chapter 14 - Store trade events and orders inside the database . . . . . . . . . . . . . . . . . . 169
Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 169
Overview of requirements . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 169
Create a new data_warehouse application in the umbrella . . . . . . . . . . . . . . . . . . . . 170
Connect to the database using Ecto . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 170
Store trade events’ data . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 172
Store orders’ data . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 176
Implement supervision . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 183

Chapter 15 - Backtest trading strategy . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 194


Objectives . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 194
Overview of requirements . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 194
Implement the storing task . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 197
Test the backtesting . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 200
Preface
In recent years Elixir¹ programming language gained a lot of interest in the industry.
Its unmatched parallelization capabilities are unique and powerful, making it a great candidate for
highly concurrent systems like the ones trading assets on exchanges.
In this book, we will go through the development process of a cryptocurrency trading bot in Elixir.
We will start grounds up and chapter by chapter progress with the implementation ending up with a
fully fledged naive trading strategy. We will be designing process supervision trees, describing why
specific decisions were taken and how will they impact the system going forward.
By any stretch of the imagination, I don’t believe that “this is the only way”(nor even the best way)
to create a cryptocurrency trading bot in Elixir. This book focuses more on building a real-life project
and iterating over it, taking decisions on the way as it would happen in a real work environment.
There are parts that will be “perfect” the first time around, but there are also others, where we will
take compromises to “get it to working” and then when the time is right, we will refactor them as
we will gain a better understanding of the domain problem.
Limit of Liability/Disclaimer of Warranty
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
¹[Link]
Who this book is for
This book will be a great resource for everyone that already knows Elixir and wants to get a feel of
how developing a non-trivial system looks like using it.
Readers do not need deep knowledge about cryptocurrencies to follow along as I will shy away from
crypto / trading jargon as well as will explain it whenever it’s unavoidable.
This is not a book focused on trading strategies, neither it’s a financial advice to trade at all. The
main focus of this book is to showcase how the implementation of even complex problems can be
achieved in Elixir by simple processes working together in a orchestrated manner.
The strategy described in this book is naive and most probably will end up losing money, but that’s
not the point of this book. As we will build up the strategy we will face a spectrum of problems that
developers face at work. It’s a great primer if you want to get your first “project” behind your belt.
So, if you’ve already gone through the motions and learned Elixir and OTP but still feel like you
need to get your hands dirty with a “real problem” to “make it stick”, this book is for you.
What this book covers
This book is a loosely written representation of “Create a cryptocurrency trading bot in Elixir²” video
course released on YouTube.
It’s still an ongoing production and will be continued in the upcoming months, at this moment it
contains:

Chapter 1 - Stream live crypto prices from Binance WSS


Stream live cryptocurrency prices (trade events) from the Binance exchange. Starting grounds up, we
will create a new umbrella project and a streamer application inside it. The streamer application will
use a Websocket client called WebSockex to establish a connection with the Binance API and receive
a live feed. After receiving the event as JSON string, we will decode it using the jason library and
convert it to our own data struct. At the end of the chapter, we will see decoded trade events being
logged to the terminal.

Chapter 2 - Create a naive trading strategy - single trader without


supervision
In this chapter we will create our first naive trading strategy. We will create another application
inside our umbrella called naive. We will put data streamed to our streamer application to good use
by sending it over to the naive application. We will start with a very basic solution consisting of
single process called trader that will utilize the GenServer behaviour. It will allow us to go through
the full trading cycle and will give us something that “works”.

Chapter 3 - Introduce PubSub as a communication method


To allow our trading strategy to scale to multiple parallel traders, we need to find a way to distribute
the latest prices (trade events) to those multiple traders. We will introduce PubSub to broadcast
messages from the streamer(s) to the trader(s). PubSub will allow us to break hardcoded references
between applications in our umbrella and that will become a pattern that we will utilize moving
forward.

Chapter 4 - Mock the Binance API


Besides historical prices (trade events), to perform backtesting, we need to be able to mock placing
orders and get trade events back as they are filled. In this chapter we will focus on developing the
solution that will allow our trader to “trade” without contacting the Binance exchange(for people
without Binance accounts), it will also allow us to backtest our trading strategy.
²[Link]
What this book covers 4

Chapter 5 - Enable parallel trading on multiple symbols


Our basic strategy implementation from the last chapter is definitely too basic to be used in a
“production environment” - it can’t be neither scaled nor it is fault-tolerant. In this chapter, we will
upgrade our naive strategy to be more resilient. This will require a supervision tree to be created and
will allow us to see different supervision strategies in action and understand the motivation behind
using and stacking them.

Chapter 6 - Introduce a buy_down_interval to make a single trader


more profitable
At this moment our [Link] implementation will blindly place a buy order at the price of
the last trade event. Whenever the [Link] process will finish trade, a new [Link]
process will be started and it will end up placing a buy order at the same price as the price of the
previous sell order. This will cost us double the fee without gaining any advantage and would cause
further complications down the line so we will introduce a buy_down_interval which will allow the
[Link] processes to place buy order below the current trade event’s price.

Chapter 7 - Introduce a trader budget and calculating the


quantity
Since the second chapter our [Link] processes are placing orders with a hardcoded quantity
of 100. In this chapter, we will introduce a budget that will be evenly split between the [Link]
processes using chunks. We will utilize that budget to calculate quantity (to be able to do that we
need to fetch further step_size information from the Binance API)

Chapter 8 - Add support for multiple transactions per order


Our [Link] implementation assumes that our orders will be filled within a single transaction,
but this isn’t always the case. In this chapter, we will discuss how could we implement the support
for multiple transactions per order and race conditions that could occur between the bot and the
Binance API.

Chapter 9 - Run multiple traders in parallel


With PubSub, supervision tree, buy down and budget in place we can progress with scaling the
number of traders. This will require further improvements to our trading strategy like introducing a
rebuy_interval. At the end of this chapter our trading strategy will be able to start and run multiple
traders in parallel.
What this book covers 5

Chapter 10 - Fine-tune trading strategy per symbol


Currently naive strategy works based on settings hardcoded in the leader module. To allow for
fine-tunning the naive trading strategy per symbol we will introduce a new database together with
table that will store trading settings.

Chapter 11 - Supervise and autostart streaming


In the last chapter we introduced a new database inside the naive application to store default settings,
in this chapter we will do the same for the streamer application. Inside the settings there will be
a status flag that will allow us to implement the autostarting functionality on initialization using
Task abstracting

Chapter 12 - Start, stop, shutdown and autostart trading


To follow up after autostarting streaming we will apply the same trick to the trading supervision
tree using Task abstraction. We will need to introduce new supervision level to achieve correct
supervision strategy.

Chapter 13 - Abstract duplicated supervision code


As both the naive and the streamer application contain almost the same copy-pasted code that
allows us to start, stop and autostart workers. We will look into how could we abstract the common
parts of that implementation to a single module. We will venture into utilizing the __using__ macro
to get rid of the boilerplate.

Chapter 14 - Store trade events and orders inside the database


To be able to backtest the trading strategy, we need to have historical prices (trade events) and list of
orders that were placed stored in the database, which will be the focus of this chapter. At this moment,
the latest prices (trade events) are broadcasted to PubSub topic and traders are subscribing to it. We
will create a new application called data_warehouse inside our umbrella that will be responsible for
subscribing to the same PubSub topics and storing incoming prices (trade events) in the Postgres
database. We will update the [Link] module to broadcast orders as traders will place them.
Then we will move on to adding supervision similar to the one from the naive and the streamer
applications but this time we will show how we could avoid using both common module and macros
by utilizing the Registry module.
What this book covers 6

Chapter 15 - Backtest trading strategy


In this chapter, we will be backtesting our trading strategy by developing a publisher inside the
DataWarehouse application. It will stream trade events from the database to broadcast them to
the trade_events:#{symbol} PubSub topic. It will use the same topic as data would be streamed
directly from the Binance. From the trader’s perspective, it won’t any difference and will cause
normal trading activity that will be stored inside the database to be analyzed later.
Source code
The source code for this book is hosted on Github:
[Link]
Final code of each chapter has it’s own branch.
Chapter 1 - Stream live crypto prices
from Binance WSS
Objectives
• create an umbrella app
• create a supervised application inside an umbrella
• connect to Binance’s WebSocket Stream using the WebSockex module
• define a TradeEvent struct that will hold incoming data
• decode incoming events using the Jason module

Create an umbrella app


In the beginning, there was nothing… so we need to create an umbrella project:

1 mix new hedgehog --umbrella

Create a supervised application inside an umbrella


We can now proceed with creating a new supervised application called streamer inside our umbrella:

1 cd hedgehog/apps
2 mix new streamer --sup

Connect to Binance’s WebSocket Stream using the


WebSockex module
To establish a connection to Binance API’s stream, we will need to use a WebSocket client. The
module that we will use is called WebSockex³. Scrolling down to the Installation section inside
module’s readme on Github, we are instructed what dependency we need to add to our project.
We will append :websockex to the deps function inside the [Link] file of the streamer application:
³[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 9

1 # /apps/streamer/[Link]
2 defp deps do
3 [
4 {:websockex, "~> 0.4.2"}
5 ]
6 end

As we added a dependency to our project, we need to fetch it using mix [Link].


We can now progress with creating a module that will be responsible for streaming. We will create
a new file called [Link] inside the apps/streamer/lib/streamer directory.
From the readme of WebSockex⁴ module, we can see that to use it we need to create a module that
will implement the WebSockex behavior:

1 # WebSockex's readme
2 defmodule WebSocketExample do
3 use WebSockex
4
5 def start_link(url, state) do
6 WebSockex.start_link(url, __MODULE__, state)
7 end
8
9 def handle_frame({type, msg}, state) do
10 [Link] "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
11 {:ok, state}
12 end
13
14 def handle_cast({:send, {type, msg} = frame}, state) do
15 [Link] "Sending #{type} frame with payload: #{msg}"
16 {:reply, frame, state}
17 end
18 end

We will copy the whole code above across to our new [Link] file.
The first step will be to update the module name to match our file name:

1 # /apps/streamer/lib/streamer/[Link]
2 defmodule [Link] do

In the spirit of keeping things tidy - we will now remove the handle_cast/2 function (last function
in our module) as we won’t be sending any messages back to Binance via WebSocket (to place orders
etc - Binance provides a REST API which we will use in the next chapter).
⁴[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 10

Next, let’s look up what URL should we use to connect to Binance’s API. Binance has a separate
WSS (Web Socket Streams) documentation at Github⁵
Scrolling down we can see the General WSS information section where 3 important pieces of
information are listed:

The base endpoint is: [Link]


Raw streams are accessed at /ws/<streamName>
All symbols for streams are lowercase

We can see that the full endpoint for raw streams(we will be using a “raw” stream) will be
[Link] with stream name at the end (together with lowercased
symbol).
Note: In context of Binance API, “raw” means that no aggregation was performed before broadcast-
ing the data on WebSocket.
Let’s introduce a module attribute that will hold the full raw stream endpoint which will be used
across the module:

1 # /apps/streamer/lib/streamer/[Link]
2 @stream_endpoint "[Link]

Now back in Binance’s WSS documentation⁶ we need to search for “Trade Streams”. “trade” in
context of this documentation means an exchange of assets(coins/tokens) by two sides (buyer and
seller). Our future trading strategy will be interested in the “latest price” which is simply the last
trade event’s price.
We can see that docs are pointing to the following stream name:

1 Stream Name: <symbol>@trade

Together, our full URL looks like: “[Link]


To give a concrete example: raw trade events stream URL for symbol XRPUSDT is: “[Link]
(remember that symbols need to be lowercased, otherwise no trade events will get streamed - there’s
no error).
Back to the IDE, we will now modify the start_link/2 function to use Binance API’s URL:

⁵[Link]
⁶[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 11

1 # /apps/streamer/lib/streamer/[Link]
2 def start_link(symbol) do
3 symbol = [Link](symbol)
4
5 WebSockex.start_link(
6 "#{@stream_endpoint}#{symbol}@trade",
7 __MODULE__,
8 nil
9 )
10 end

Instead of passing an URL, we modified the function to accept a symbol, downcase it and use it
together with the module’s @stream_endpoint attribute to build a full URL.
At this moment streaming of trade events already works which we can test using iex:

1 $ iex -S mix
2 ...
3 iex(1)> [Link].start_link("xrpusdt")
4 {:ok, #PID<0.335.0>}
5 Received Message - Type: :text -- Message: "{\"e\":\"trade\",\"E\":1603226394741,\"s\
6 \":\"XRPUSDT\",\"t\":74608889,\"p\":\"0.24373000\",\"q\":\"200.00000000\",\"b\":9482\
7 44411,\"a\":948244502,\"T\":1603226394739,\"m\":true,\"M\":true}"
8 Received Message - Type: :text -- Message: "{\"e\":\"trade\",\"E\":1603226394741,\"s\
9 \":\"XRPUSDT\",\"t\":74608890,\"p\":\"0.24372000\",\"q\":\"143.20000000\",\"b\":9482\
10 44412,\"a\":948244502,\"T\":1603226394739,\"m\":true,\"M\":true}"

We can see the messages logged above because we copied the sample implementation from
WebSockex’s readme⁷ where handle_frame/2 function uses [Link]/1 to print out all incoming
data. The lesson here is that every incoming message from Binance will cause the handle_frame/2
callback to be called with the message and the process’ state.
Just for reference, our module should look currently as follows:

⁷[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 12

1 # /apps/streamer/lib/streamer/[Link]
2 defmodule [Link] do
3 use WebSockex
4
5 @stream_endpoint "[Link]
6
7 def start_link(symbol) do
8 symbol = [Link](symbol)
9
10 WebSockex.start_link(
11 "#{@stream_endpoint}#{symbol}@trade",
12 __MODULE__,
13 nil
14 )
15 end
16
17 def handle_frame({type, msg}, state) do
18 [Link] "Received Message - Type: #{inspect type} -- Message: #{inspect msg}"
19 {:ok, state}
20 end
21 end

Decode incoming events using the Jason module


Currently, all incoming data from WebSocket is encoded as a JSON. To decode JSON we will use the
jason⁸ module.
Scrolling down to the Installation section inside module’s readme, we can see that we need to add
it to the dependencies and we can start to use it right away.
Let’s open the [Link] file of the streamer application and append the :jason dependency to the
list inside deps function:

1 # /apps/streamer/[Link]
2 defp deps do
3 [
4 {:jason, "~> 1.2"},
5 {:websockex, "~> 0.4.2"}
6 ]
7 end

As previously, don’t forget to run mix [Link] to fetch the new dependency.
⁸[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 13

Looking through the documentation of the Jason module we can see encode!/2 and decode!/2
functions, both of them have exclamation marks which indicates that they will throw an error
whenever they will be unable to successfully encode or decode the passed value.
This is less than perfect for our use case as we would like to handle those errors in our own
way(technically we could just use try/rescue but as we will find out both encode/2 and decode/2
are available).
We will go a little bit off-topic but I would highly recommend those sorts of journeys around
somebody’s code. Let’s look inside the Jason⁹ module. Scrolling down in search of decode/2 (without
the exclamation mark) we can see it about line 54:

1 # /lib/[Link]
2 def decode(input, opts \\ []) do
3 input = IO.iodata_to_binary(input)
4 [Link](input, format_decode_opts(opts))
5 end

It looks like it uses the parse/2 function of a Decoder module, let’s scroll back up and check where
it’s coming from. At line 6:

1 # /lib/[Link]
2 alias Jason.{Encode, Decoder, DecodeError, EncodeError, Formatter}

we can see that Decoder is an alias of the [Link]¹⁰. Scrolling down to the [Link]
module we will find a parse/2 function about line 43:

1 # /lib/[Link]
2 def parse(data, opts) when is_binary(data) do
3 key_decode = key_decode_function(opts)
4 string_decode = string_decode_function(opts)
5 try do
6 value(data, data, 0, [@terminate], key_decode, string_decode)
7 catch
8 {:position, position} ->
9 {:error, %DecodeError{position: position, data: data}}
10 {:token, token, position} ->
11 {:error, %DecodeError{token: token, position: position, data: data}}
12 else
13 value ->
14 {:ok, value}
15 end
16 end
⁹[Link]
¹⁰[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 14

Based on the result of decoding it will either return {:ok, value} or {:error, %[Link]{...}}
we can confirm that by digging through documentation of the module on hex¹¹.
Once again, the point of this lengthy investigation was to show that Elixir code is readable and
easy to understand so don’t be thrown off when documentation is a little bit light, quite opposite,
contribute to docs and code as you gain an better understanding of the codebase.
We can now get back to our [Link] module and modify the handle_frame/2 function to
decode the incoming JSON message. Based on the result of [Link]/2 we will either call the
process_event/2 function or log an error. Here’s the new version of the handle_frame/2 function:

1 # /apps/streamer/lib/streamer/[Link]
2 def handle_frame({_type, msg}, state) do
3 case [Link](msg) do
4 {:ok, event} -> process_event(event)
5 {:error, _} -> [Link]("Unable to parse msg: #{msg}")
6 end
7
8 {:ok, state}
9 end

Please make note that type is now prefixed with an underscore as we aren’t using it at the moment.
Second important thing to note is that we are using Logger so it needs to be required at the begining
of the module:

1 # /apps/streamer/lib/streamer/[Link]
2 require Logger

Before implementing the process_event/2 function we need to create a structure that will hold the
incoming trade event’s data.
Let’s create a new directory called binance inside the apps/streamer/lib/streamer/ and new file
called trade_event.ex inside it.
Our new module will hold all the trade event’s information but we will also use readable field
names(you will see the incoming data below). We can start by writing a skeleton module code:

1 # /apps/streamer/lib/streamer/binance/trade_event.ex
2 defmodule [Link] do
3 defstruct []
4 end

We can refer to Binance’s docs¹² to get a list of fields:


¹¹[Link]
¹²[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 15

1 {
2 "e": "trade", // Event type
3 "E": 123456789, // Event time
4 "s": "BNBBTC", // Symbol
5 "t": 12345, // Trade ID
6 "p": "0.001", // Price
7 "q": "100", // Quantity
8 "b": 88, // Buyer order ID
9 "a": 50, // Seller order ID
10 "T": 123456785, // Trade time
11 "m": true, // Is the buyer the market maker?
12 "M": true // Ignore
13 }

Let’s copy them across and convert the comments to update the defstruct inside the
[Link] module’s struct to following:

1 # /apps/streamer/lib/streamer/binance/trade_event.ex
2 defstruct [
3 :event_type,
4 :event_time,
5 :symbol,
6 :trade_id,
7 :price,
8 :quantity,
9 :buyer_order_id,
10 :seller_order_id,
11 :trade_time,
12 :buyer_market_maker
13 ]

That’s all for this struct, we can now get back to implementing the process_event/2 func-
tion inside the [Link] module. We will map every field of the response map to the
%[Link] struct. A useful trick here would be to copy the list of fields once
again from the struct and assign the incoming fields one by one.
Inside the header of the function, we will pattern match on event type(a field called “e” in the
message) to confirm that indeed we received a trade event). In the end, the process_event/2 function
should look as follows:
Chapter 1 - Stream live crypto prices from Binance WSS 16

1 # /apps/streamer/lib/streamer/[Link]
2 defp process_event(%{"e" => "trade"} = event) do
3 trade_event = %[Link]{
4 :event_type => event["e"],
5 :event_time => event["E"],
6 :symbol => event["s"],
7 :trade_id => event["t"],
8 :price => event["p"],
9 :quantity => event["q"],
10 :buyer_order_id => event["b"],
11 :seller_order_id => event["a"],
12 :trade_time => event["T"],
13 :buyer_market_maker => event["m"]
14 }
15
16 [Link](
17 "Trade event received " <>
18 "#{trade_event.symbol}@#{trade_event.price}"
19 )
20 end

We added the [Link]/2 function to be able to see logs of incoming trade events.
Lastly, before testing our implementation, let’s add a nice interface to our streamer application that
allows to start streaming:

1 # /apps/streamer/lib/[Link]
2 defmodule Streamer do
3 @moduledoc """
4 Documentation for `Streamer`.
5 """
6
7 def start_streaming(symbol) do
8 [Link].start_link(symbol)
9 end
10 end

The final version of the [Link] module should look like this¹³.
Last step will be to add the Logger configuration into main config/[Link] file. We will set the
Logger level to :debug for a moment to be able to see incoming trade events:

¹³[Link]
Chapter 1 - Stream live crypto prices from Binance WSS 17

1 # /config/[Link]
2 config :logger,
3 level: :debug

This finishes the implementation part of this chapter, we can now give our implementation a whirl
using iex:

1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("xrpusdt")
4 {:ok, #PID<0.251.0>}
5 [Link].217 [debug] Trade event received [email protected]
6 [Link].381 [debug] Trade event received [email protected]
7 [Link].380 [debug] Trade event received [email protected]
8 [Link].386 [debug] Trade event received [email protected]

As we can see, the streamer is establishing a WebSocket connection with the Binance’s API and it’s
receiving trade events. It decodes them from JSON to %[Link] struct and
logs a compiled message. Also, our interface hides implementation details from the “user” of our
application.
We will now flip the Logger level back to info so the output won’t every incoming trade event:

1 # /config/[Link]
2 config :logger,
3 level: :info

[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github¹⁴
¹⁴[Link]
Chapter 2 - Create a naive trading
strategy - a single trader process
without supervision
Objectives
• create another supervised application inside umbrella to store our trading strategy
• define callbacks for events dependent on state of the trader
• push events from the streamer app to the naive app

Initializiation
To develop our naive strategy will need to create a new supervised application inside our umbrella
project:

1 cd apps
2 mix new naive --sup

We can now focus on creating a trader abstraction inside that newly created application. First we
need to create a new file called [Link] inside apps/naive/lib/naive/.
Let’s start with a skeleton of a GenServer:

1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use GenServer
4
5 require Logger
6
7 def start_link(args) do
8 GenServer.start_link(__MODULE__, args, name: :trader)
9 end
10
11 def init(args) do
12 {:ok, args}
13 end
14 end
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 19

Our module uses the GenServer¹⁵ behaviour and to fulfill it’s “contract”, we need to implement the
init/1 function. The start_link/1 function is a convention and it allows us to register the process
with a name(it’s a default function that supervisor will use when starting the Trader). We also add
a require Logger as we will keep on logging across the module.
Next, let’s model the state of our server:

1 # /apps/naive/lib/naive/[Link]
2 defmodule State do
3 @enforce_keys [:symbol, :profit_interval, :tick_size]
4 defstruct [
5 :symbol,
6 :buy_order,
7 :sell_order,
8 :profit_interval,
9 :tick_size
10 ]
11 end

Our trader needs to know:

• what symbol does it need to trade (“symbol” here is a pair of assets for example “XRPUSDT”,
which is XRP to/from USDT)
• placed buy order (if any)
• placed sell order (if any)
• profit interval (what net profit % we would like to achieve when buying and selling an asset -
single trade cycle)
• tick_size (yes, I know, jargon warning. We can’t ignore it here as it needs to be fetched from
Binance and it’s used to calculate a valid price. Tick size differs between symbols and it is a
smallest acceptable price movement up or down. For example in physical world tick size for
USD is a single cent, you can’t sell something for $1.234, it’s either $1.23 or $1.24 (one cent
difference between those is the tick size) - more info here¹⁶

Our strategy won’t be able to work without symbol, profit_interval nor tick_size so we added them
to the @enforce_keys attribute. This will ensure that we won’t create an invalid %State{} without
those values.
As now we know that our GenServer will need to receive those details via args, we can update
pattern matching in start_link/1 and init/1 functions to confirm that passed values are indeed
maps:

¹⁵[Link]
¹⁶[Link]
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 20

1 # /apps/naive/lib/naive/[Link]
2 def start_link(%{} = args) do
3 ...
4 end
5
6 def init(%{symbol: symbol, profit_interval: profit_interval}) do
7 ...
8 end

As we are already in the init/1 function we will need to modify it to fetch the tick_size for the
passed symbol and initialize a fresh state:

1 # /apps/naive/lib/naive/[Link]
2 def init(%{symbol: symbol, profit_interval: profit_interval}) do
3 symbol = [Link](symbol)
4
5 [Link]("Initializing new trader for #{symbol}")
6
7 tick_size = fetch_tick_size(symbol)
8
9 {:ok,
10 %State{
11 symbol: symbol,
12 profit_interval: profit_interval,
13 tick_size: tick_size
14 }}
15 end

We are uppercasing the symbol above as Binance’s REST API accepts only uppercased symbols.
It’s the time to connect to Binance’s REST API. The easiest way to do that will be to use the binance¹⁷
module.
As previously, looking through the module’s docs on Github, we can see the Installation
section. We will follow the steps mentioned there, starting from adding binance to the deps in
/apps/naive/[Link]:

¹⁷[Link]
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 21

1 # /apps/naive/[Link]
2 defp deps do
3 [
4 {:binance, "~> 0.7.1"},
5 {:decimal, "~> 2.0"},
6 {:streamer, in_umbrella: true}
7 ]
8 end

Beside adding the :binance module, we also added :decimal and the :streamer. The decimal¹⁸
module will help us to calculate the buy and sell prices (without decimal module we would have
problems with precision). Lastly, we need to include the :streamer application(created in the first
chapter) as we will use the %[Link]{} struct inside the naive application.
We need to run mix [Link] to install our new deps.
We can now get back to the trader module and focus on fetching the tick size from Binance:

1 # /apps/naive/lib/naive/[Link]
2 defp fetch_tick_size(symbol) do
3 Binance.get_exchange_info()
4 |> elem(1)
5 |> [Link](:symbols)
6 |> [Link](&(&1["symbol"] == symbol))
7 |> [Link]("filters")
8 |> [Link](&(&1["filterType"] == "PRICE_FILTER"))
9 |> [Link]("tickSize")
10 |> [Link]()
11 end

We are using get_exchange_info/0 to fetch list of symbols, that we will filter to find the symbol
that we are requested to trade. Tick size is defined as a PRICE_FILTER filter. Here’s the link¹⁹ to the
documentation listing all keys in the result. In a nutshell, that’s how the important parts of the result
looks like:

¹⁸[Link]
¹⁹[Link]
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 22

1 {:ok, %{
2 ...
3 "symbols": [
4 %{
5 "symbol": "ETHBTC",
6 ...
7 "filters": [
8 ...
9 %{"filterType: "PRICE_FILTER", "tickSize": tickSize, ...}
10 ],
11 ...
12 }
13 ]
14 }}

How trading strategy will work?


Our trader process has an internal state that will serve as a indicator where in the trade cycle is it.
The following diagram shows 3 possible trader states that trader will progress through from left to
right:

Trader states

Our trader will be receiving trade events sequentially and take decisions
based on it’s own state and trade event’s contents.
We will focus on a trader in 3 different states:

• a new trader without any orders


• a trader with a buy order placed
• a trader with a sell order placed.
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 23

First state - A new trader


Trader doesn’t have any open orders which we can confirm by pattern matching on the buy_order
field from its state. From the incoming event, we can get
the current price which we will use in the buy order that the trader will place.

Second state - Buy order placed


After placing a buy order, trader will be pattern matching to confirm that
has incoming event filled his buy order otherwise ignoring it.
When trade event matching the order id of trader’s buy order will arrive, it means that buy order got
filled(simplification - our order could be filled in two or more transactions but implementation in
this chapter won’t cater for this case, it will always assume that it got filled in a single transaction)
and trader can now place the sell order based on the expected profit and the buy_price.

Third state - Sell order placed


Finally, in a very similar fashion to previous state, the trader will be pattern matching to confirm
that incoming event has filled his sell order(matching order id), otherwise ignoring it.
When trade event matching the order id of trader’s sell order will arrive, that means that sell order
got filled(simplification as described above) and full trade cycle has ended and the trader can now
exit.

Implementation of the first scenario


Enough theory :) back to the editor, we will implement the first scenario. Before doing that let’s alias
Streamer’s TradeEvent struct as we will rely on it heavily in pattern matching.

1 # /apps/naive/lib/naive/[Link]
2 alias [Link]

We are also aliasing the %[Link]{} struct as we will rely on it heavily in


pattern matching.
To confirm that we are dealing with a “new” trader, we will pattern match on buy_order: nil from
it’s state:
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 24

1 # /apps/naive/lib/naive/[Link]
2 def handle_cast(
3 %TradeEvent{price: price},
4 %State{symbol: symbol, buy_order: nil} = state
5 ) do
6 quantity = 100 # <= Hardcoded until chapter 7
7
8 [Link]("Placing BUY order for #{symbol} @ #{price}, quantity: #{quantity}")
9
10 {:ok, %[Link]{} = order} =
11 Binance.order_limit_buy(symbol, quantity, price, "GTC")
12
13 {:noreply, %{state | buy_order: order}}
14 end

For the time being, we will keep the quantity hardcoded as this chapter will
get really long otherwise - don’t worry, we will refactor this in one of the next chapters.
After confirming that we deal with the “new” trader(by pattern matching on the buy_order field
from state), we can safely progress to placing a new buy order. We just need to remember to return
the updated state as otherwise, trader will go on a shopping spree, as every next incoming event will
cause further buy orders(above pattern match will continue to be successful).

Implementation of the second scenario


With that out of the way, we can now move on to monitoring for event that matches our buy order
id and quantity to confirm that our buy order got filled:

1 # /apps/naive/lib/naive/[Link]
2 def handle_cast(
3 %TradeEvent{
4 buyer_order_id: order_id,
5 quantity: quantity
6 },
7 %State{
8 symbol: symbol,
9 buy_order: %[Link]{
10 price: buy_price,
11 order_id: order_id,
12 orig_qty: quantity
13 },
14 profit_interval: profit_interval,
15 tick_size: tick_size
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 25

16 } = state
17 ) do
18 sell_price = calculate_sell_price(buy_price, profit_interval, tick_size)
19
20 [Link](
21 "Buy order filled, placing SELL order for " <>
22 "#{symbol} @ #{sell_price}), quantity: #{quantity}"
23 )
24
25 {:ok, %[Link]{} = order} =
26 Binance.order_limit_sell(symbol, quantity, sell_price, "GTC")
27
28 {:noreply, %{state | sell_order: order}}
29 end

We will implement calculating sell price in a sepearate function based on buy price, profit interval
and tick_size.
Our pattern match will confirm that indeed our buy order got filled(order_id and quantity matches)
so we can now we proceed with placing a sell order using calculated sell price and quantity retrieved
from buy order.
Again, don’t forget to return the updated state as otherwise, trader will keep on placing sell orders
for every incoming event.
To calculate the sell price we will need to use precise math and that will require a custom module.
We will use the Decimal²⁰ module, so first, let’s alias it at the top of the file:

1 # /apps/naive/lib/naive/[Link]
2 alias Decimal, as: D

Now to calculate the correct sell price, we can use the following formula which gets me pretty close
to expected value:

1 # /apps/naive/lib/naive/[Link]
2 defp calculate_sell_price(buy_price, profit_interval, tick_size) do
3 fee = [Link]("1.001")
4 original_price = [Link]([Link](buy_price), fee)
5
6 net_target_price =
7 [Link](
8 original_price,
9 [Link]("1.0", profit_interval)
²⁰[Link]
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 26

10 )
11
12 gross_target_price = [Link](net_target_price, fee)
13
14 D.to_float(
15 [Link](
16 D.div_int(gross_target_price, tick_size),
17 tick_size
18 )
19 )
20 end

First, we would like to convert all the numbers to the Decimal structs so it will be easier to work
with them. We will also hardcode the fee which we will refactor in one of the future chapters.
We started by calculating the gross_buy_price which is a sum of buy
price together with the fee that we paid on top of it.
Next, we enlarge the originally paid price by profit interval to get net_target_price
As we will be charged a fee for selling, we need to add the fee again on top of this target sell
price(gross_target_price).
Next, we will use the tick size as Binance won’t accept any prices that aren’t divisible by the symbols’
tick sizes so we need to “normalize” them on our side.

Implementation of the third scenario


Getting back to handling incoming events, we can now add a clause for a trader that wants to
confirm that his sell order was filled:

1 # /apps/naive/lib/naive/[Link]
2 def handle_cast(
3 %TradeEvent{
4 seller_order_id: order_id,
5 quantity: quantity
6 },
7 %State{
8 sell_order: %[Link]{
9 order_id: order_id,
10 orig_qty: quantity
11 }
12 } = state
13 ) do
14 [Link]("Trade finished, trader will now exit")
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 27

15 {:stop, :normal, state}


16 end

When the sell order was successfully filled(confirmed by pattern matching above), there’s nothing
else to do for the trader, so it can retrun a tuple with :stop atom which will cause the trader process
to terminate.

Implementation fallback scenario


A final callback function that we will need to implement will just ignore all
incoming events as they were not matched by any of the previous pattern matches:

1 # /apps/naive/lib/naive/[Link]
2 def handle_cast(%TradeEvent{}, state) do
3 {:noreply, state}
4 end

We need this callback for cases where our trader has an “open” order(not yet filled) and the incoming
event has nothing to do with it, so it needs to be ignored.

Updating the Naive interface


Now we will update an interface of our naive application by modifying the Naive module to allow
to send an event to the trader:

1 # /apps/naive/lib/[Link]
2 defmodule Naive do
3 @moduledoc """
4 Documentation for `Naive`.
5 """
6 alias [Link]
7
8 def send_event(%TradeEvent{} = event) do
9 [Link](:trader, event)
10 end
11 end

We will use the fact that we registered our trader process with a name to be able to cast a message
to it.

Updating streamer app


To glue our apps together for the time and keep things simple in this chapter we will modify the
streamer process to simply call our new Naive interface directly by appending a following function
call at the end of handle_event/2 function inside the [Link] module:
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 28

1 # /apps/streamer/lib/streamer/[Link]
2 Naive.send_event(trade_event)

This creates a two way link between the streamer and the naive app. In the next chapter we will fix
that as in the perfect world those apps shouldn’t even be aware of existance of each other.

Access details to Binance


Inside the main config of our umbrella project we need to define access details for our Binance
account:

1 config :binance,
2 api_key: "YOUR-API-KEY-HERE",
3 secret_key: "YOUR-SECRET-KEY-HERE"

Important note: To be able to run below test and perform real trades, Binance account is required
with balance of at least 20 USDT. In the 4th chapter we will focus on creating a BinanceMock that
will allow us to run our bot without requirement for a real Binance account. You don’t need to test
run it now if you don’t need/want to have an account.

Test run
Now it’s the time to give our implementation a run for it’s money. Once again, to be able to do that
you will need to have at least 20 USDT tokens in your Binance’s wallet and you will loose just under
0.5% of your USDTs(as “expected profit” is below 0 to quickly showcase the full trade cycle) in the
following test:

1 $ iex -S mix
2 ...
3 iex(1)> [Link].start_link(%{symbol: "XRPUSDT", profit_interval: [Link]("-\
4 0.01")})
5 [Link].648 [info] Initializing new trader for XRPUSDT
6 {:ok, #PID<0.355.0>}
7 iex(2)> Streamer.start_streaming("xrpusdt")
8 {:ok, #PID<0.372.0>}
9 iex(3)>
10 [Link].561 [info] Placing BUY order for XRPUSDT @ 0.25979000, quantity: 100
11 [Link].831 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.2577, quant\
12 ity: 100
13 [Link].094 [info] Trade finished, trader will now exit
Chapter 2 - Create a naive trading strategy - a single trader process without supervision 29

After starting the IEx session, start the trader process with a map containing symbol and profit
interval. To be able to quickly test full trade cycle we will pass sub-zero profit interval instead of
waiting for the increase in price.
Next, we will start streaming on the same symbol, please be aware that this will cause an immediate
reaction in the trader process.
We can see that our trader placed a buy order at 25.979c per XRP, it was filled in under 300ms, so
then the trader placed a sell order at ∼25.77c
which was also filled in under 300ms. This way the trader finished the trade
cycle and process can terminate.
That’s it. Congratulations! You just made your first algorithmic trade and you should be proud of
that! In the process of creating that algorithm we touched on multiple topics including GenServer
and depending on it’s state and external data (trade events) to perform different actions - this is a
very common workflow that Elixir engineers are following and it’s great to see it in action.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²¹
²¹[Link]
Chapter 3 - Introduce PubSub as a
communication method
Objectives
• consider reasons why introducing a PubSub communication would be benefitial
• implement the PubSub communication between the [Link] and the [Link](s)

Design
First, let’s look at the current situation:

Current situation

We started with the Binance streamer calling the send_event/1 function on the Naive module. The
Naive module then calls the trader process using the GenServer’s cast/2 function(via it’s registered
name).
Next step in the process of extending our trading strategy will be to scale it to run multiple
[Link] processes in parallel. To be able to do that we will need to remove the option to
register the trader process with a name(as only one process can be registered under single name).
Chapter 3 - Introduce PubSub as a communication method 31

Multiple traders can’t have the same process names

The second issue with that design was the fact that the Streamer needs to be aware of all processes
that are interested in the streamed data and explicitly push that information to them.
To fix those issues we will invert the design and introduce a PubSub mechanism:

PubSub introduced to invert the dependency and enable multiple parallel traders
Chapter 3 - Introduce PubSub as a communication method 32

The streamer will broadcast trade events to the PubSub topic and whatever is interested in that data,
can subscribe to the topic and it will receive the broadcasted messages.
There’s no coupling between the Streamer and Naive app any more.
We can now introduce multiple traders that will subscribe to the topic and
they will receive messages from the PubSub:

Multiple traders subscribing to PubSub

Going even further down the line we can picture that system could consist of other processes
interested in the streamed data. Example of those could be a process that will save all streamed
information to the database to be utilized in backtesting later on:
Chapter 3 - Introduce PubSub as a communication method 33

Multiple subscribers including traders and other processes

Implementation
We will start by adding a [Link]²² library to both Streamer and Naive app(as both will be
using it, Streamer app as a broadcaster and Naive app as a subscriber)
Scrolling down through it’s readme on GitHub we can see that we need to add :phoenix_pubsub to
list of dependencies:

1 # /apps/streamer/[Link] & /apps/naive/[Link]


2 defp deps do
3 [
4 ...
5 {:phoenix_pubsub, "~> 2.0"},
6 ...
7 ]
8 end
²²[Link]
Chapter 3 - Introduce PubSub as a communication method 34

Remember to place it so the list will keep alphabetical order. Second step in the readme says that we
need to add PubSub as a child of our app. We need to decide where we will put it, Streamer sounds
like a good starting point. We will modify the /apps/streamer/lib/streamer/[Link]
module by appending the PubSub to it:

1 # /apps/streamer/lib/streamer/[Link]
2 def start(_type, _args) do
3 children = [
4 {
5 [Link],
6 name: [Link], adapter_name: [Link].PG2
7 }
8 ]
9 ...
10 end

We will add the :adapter_name option to instruct PubSub to use pg²³ adapter, which will give us
distrubuted process groups.
We will now modify the streamer to broadcast a message to PubSub instead of using the Naive
module’s function:

1 # /apps/streamer/lib/streamer/[Link]
2 defp process_event(...) do
3 ...
4
5 [Link](
6 [Link],
7 "trade_events:#{trade_event.symbol}",
8 trade_event
9 )
10 end

Inside the trader on init we need to subscribe to the “trade_events” PubSub channel:

²³[Link]
Chapter 3 - Introduce PubSub as a communication method 35

1 # /apps/naive/lib/naive/[Link]
2 def init(...) do
3 ...
4
5 [Link](
6 [Link],
7 "trade_events:#{symbol}"
8 )
9
10 ...
11 end

Next, we need to convert all handle_call callbacks to handle_info inside our Trader module as
PubSub doesn’t use [Link]/2 to send messages over to subscribers.
The final change will be to remove the send_event function from the Naive
module as it’s no longer required.
Our update is now finished so we can start an iex session to see how it works.
First, we will start a streamer process that will broadcast messages
to PubSub. Next, we will start trading on the same symbol. On init, trader will subscribe to a PubSub
channel and it will make a full trade cycle.

1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("xrpusdt") {:ok, #\
4 PID<0.483.0>}
5 iex(2)> [Link].start_link(%{symbol: "XRPUSDT", profit_interval: [Link]("-\
6 0.01")})
7 [Link].482 [info] Initializing new trader for XRPUSDT
8 {:ok, #PID<0.474.0>}
9 iex(3)>
10 [Link].179 [info] Placing BUY order for XRPUSDT @ 0.29462000, quantity: 100
11 [Link].783 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.29225), qu\
12 antity: 100.00000000
13 [Link].029 [info] Trade finished, trader will now exit

This shows that new trader process successfully subscribed to the PubSub, received the broadcasted
messages, placed buy/sell orders and terminated after full trade cycle finished.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁴
²⁴[Link]
Chapter 4 - Mock the Binance API
Objectives
• design the binance mock application
• create a new app
• implement getting exchange info
• implement placing buy and sell orders
• implement callback for incoming trade events
• upgrade trader and config
• test the implementation

Design
First, let’s start with the current state:

Current state of affairs

Currently our trader is using the Binance module to place buy/sell


orders and get exchange info.
The get_exchange_info/0 function doesn’t require a Binance account as it’s a publicly available
information so we can call the Binance lib directly from our module.
The remaining ones(buying/selling) require Binance account and some coins/token inside it’s wallet.
We need to mock those inside our module.
We will update the trader to fetch the Binance’s module name from the config:
Chapter 4 - Mock the Binance API 37

Binance client dictated by config

We will set up a config so it points to the Binance client to be used - either Binance or BinanceMock.
Regards the BinanceMock itself it will have the same interface as the Binance module.
It will need to store both buy and sell orders and it will allow us to retrieve them. That will cover the
REST functions but Binance also streams back trade events for those orders as they get filled, that’s
why BinanceMock will also need to broadcast fake events to the “trade_events:#” PubSub topic so
the trader will pick them up:
Chapter 4 - Mock the Binance API 38

BinanceMock needs to broadcast fake trade events back to the PubSub

When exactly should we broadcast those fake trade events? Well, the best thing
that we can do is make BinanceMock process to subscribe to the trade events stream and try to
broadcast fake trade events whenever the price of orders would be matched:

When to send fake trade events

Starting from the arrow on the left, our naive strategy will place an order at the current price.
In this hypotetical scenario price was raising for a moment after placing the buy order, so
BinanceMock will keep on waiting until a trade event will get broadcasted from the PubSub with
price below the buy order’s price. At that moment BinanceMock will generate a fake trade event
and broadcast it to the same PubSub topic.
Chapter 4 - Mock the Binance API 39

Trader will get that event and assume that it came from the Binance and that the buy order got filled
so it will place a sell order.
Similarly to the buy order, BinanceMock will keep on waiting until a trade event will get broadcasted
from the PubSub with the price above the sell order’s price. At that moment BinanceMock will
generate a fake trade event and broadcast it to the same PubSub topic.
Enough theory for now, let’s get our hands dirty with some coding

Create “BinanceMock” app


We will start by creating a new supervised app called BinanceMock:

1 $ cd apps
2 $ mix new binance_mock --sup

The next step will be to update the BinanceMock module to be a GenServer.


We will utilize:

• the Decimal module for comparing the prices


• the Logger module to log

As well as we will define internal %State{} struct that will hold:

• map called order_books for each traded symbol


• list of symbols that mock subscribed to
• last generated id - for consistent generating of unique ids for fake trade events

order_books map will consist of :"#{symbol} ⇒ %OrderBook{}. We will define the %OrderBook{}
struct as 3 lists buy_side, sell_side and historical:

1 # /apps/binance_mock/lib/binance_mock.ex
2 defmodule BinanceMock do
3 use GenServer
4
5 alias Decimal, as: D
6
7 require Logger
8
9 defmodule State do
10 defstruct order_books: %{}, subscriptions: [], fake_order_id: 1
11 end
Chapter 4 - Mock the Binance API 40

12
13 defmodule OrderBook do
14 defstruct buy_side: [], sell_side: [], historical: []
15 end
16
17 def start_link(_args) do
18 GenServer.start_link(__MODULE__, nil, name: __MODULE__)
19 end
20
21 def init(_args) do
22 {:ok, %State{}}
23 end
24 end

Implement getting exchange info


As it was mentioned before, to retrieve exchange info we can just call Binance’s function directly as
it’s a publicly avaiable information:

1 # /apps/binance_mock/lib/binance_mock.ex
2 def get_exchange_info() do
3 Binance.get_exchange_info()
4 end

Implement placing buy and sell orders


For buy and sell limit orders we will write a helper function as the logic is
the same for both order sides:

1 # /apps/binance_mock/lib/binance_mock.ex
2 def order_limit_buy(symbol, quantity, price, "GTC") do
3 order_limit(symbol, quantity, price, "BUY")
4 end
5
6 def order_limit_sell(symbol, quantity, price, "GTC") do
7 order_limit(symbol, quantity, price, "SELL")
8 end

The “order_limit” helper function will:


Chapter 4 - Mock the Binance API 41

• ensure that quantity and price are float values as the Binance module accepts both strings and
floats
• generate a fake order based on symbol, quantity, price, and side
• cast a message to the BinanceMock process to add the fake order
• return with a tuple with %OrderResponse{} struct to be consistent with the Binance module:

1 # /apps/binance_mock/lib/binance_mock.ex
2 defp order_limit(symbol, quantity, price, side) do
3 quantity = [Link]("#{quantity}") |> elem(0)
4 price = [Link]("#{price}") |> elem(0)
5
6 %[Link]{} =
7 fake_order =
8 generate_fake_order(
9 symbol,
10 quantity,
11 price,
12 side
13 )
14
15 [Link](
16 __MODULE__,
17 {:add_order, fake_order}
18 )
19
20 {:ok, convert_order_to_order_response(fake_order)}
21 end

We can now move on to the implementation of the handle_cast/2 callback to :add_order to the
order book for the symbol from the order.
It needs to do two things:

• subscribe to the trade_events:#{symbol} topic for the symbol from

the order

• add the order to the correct order book


Chapter 4 - Mock the Binance API 42

1 # /apps/binance_mock/lib/binance_mock.ex
2 def handle_cast(
3 {:add_order, %[Link]{symbol: symbol} = order},
4 %State{
5 order_books: order_books,
6 subscriptions: subscriptions
7 } = state
8 ) do
9 new_subscriptions = subscribe_to_topic(symbol, subscriptions)
10 updated_order_books = add_order(order, order_books)
11
12 {
13 :noreply,
14 %{
15 state
16 | order_books: updated_order_books,
17 subscriptions: new_subscriptions
18 }
19 }
20 end

We will start with the implementation of subscribe_to_topic/2 function. We need to make sure
that the symbol is uppercase’d as well as check have we already subscribed to that topic. Otherwise,
we can safely use the PubSub module to subscribe to the trade_events:#{symbol} topic for this
symbol.
We need to remember to append the symbol to the list of subscription and return the updated list:

1 # /apps/binance_mock/lib/binance_mock.ex
2 defp subscribe_to_topic(symbol, subscriptions) do
3 symbol = [Link](symbol)
4 stream_name = "trade_events:#{symbol}"
5
6 case [Link]?(subscriptions, symbol) do
7 false ->
8 [Link]("BinanceMock subscribing to #{stream_name}")
9
10 [Link](
11 [Link],
12 stream_name
13 )
14
15 [symbol | subscriptions]
16
Chapter 4 - Mock the Binance API 43

17 _ ->
18 subscriptions
19 end
20 end

Next, time for implementation of add_order function. First, we need to get the order book for the
symbol of the order. Depends on the side of the order we will update either buy_side or sell_side
list remembering that both sides are sorted. We are sorting them so we can easily grab all orders
that should be filled whenever trade event arrived, this will become clearer as we will write a handle
callback for incoming trade events:

1 # /apps/binance_mock/lib/binance_mock.ex
2 defp add_order(
3 %[Link]{symbol: symbol} = order,
4 order_books
5 ) do
6 order_book =
7 [Link](
8 order_books,
9 :"#{symbol}",
10 %OrderBook{}
11 )
12
13 order_book =
14 if [Link] == "SELL" do
15 [Link]!(
16 order_book,
17 :sell_side,
18 [order | order_book.sell_side]
19 |> [Link](&[Link]?([Link](&[Link]), [Link](&[Link])))
20 )
21 else
22 [Link]!(
23 order_book,
24 :buy_side,
25 [order | order_book.buy_side]
26 |> [Link](&[Link]?([Link](&[Link]), [Link](&[Link])))
27 )
28 end
29
30 [Link](order_books, :"#{symbol}", order_book)
31 end
Chapter 4 - Mock the Binance API 44

Now we need to follow up and implement the functions that we referred to


previously - those are generate_fake_order and convert_order_to_order_response.
Starting with the generate_fake_orders, it’s a function that takes a symbol, quantity, price and
side and based on those values returns a [Link] struct. To return the struct we will need
to generate a unique id for each faked order - this is where fake_order_id will be used(callback
implemented later). This way we will be able to run tests multiple times using the BinanceMock and
always get the same ids:

1 # /apps/binance_mock/lib/binance_mock.ex
2 defp generate_fake_order(symbol, quantity, price, side)
3 when is_binary(symbol) and
4 is_float(quantity) and
5 is_float(price) and
6 (side == "BUY" or side == "SELL") do
7 current_timestamp = :os.system_time(:millisecond)
8 order_id = [Link](__MODULE__, :generate_id)
9 client_order_id = :[Link](:md5, "#{order_id}") |> Base.encode16()
10
11 [Link](%{
12 symbol: symbol,
13 order_id: order_id,
14 client_order_id: client_order_id,
15 price: Float.to_string(price),
16 orig_qty: Float.to_string(quantity),
17 executed_qty: "0.00000000",
18 cummulative_quote_qty: "0.00000000",
19 status: "NEW",
20 time_in_force: "GTC",
21 type: "LIMIT",
22 side: side,
23 stop_price: "0.00000000",
24 iceberg_qty: "0.00000000",
25 time: current_timestamp,
26 update_time: current_timestamp,
27 is_working: true
28 })
29 end

We can now focus on converting the [Link] to the [Link] struct. As


[Link] struct contains almost all of the same fields that the [Link] struct,
we can use struct function without exclanation mark to ignore all additional fields. The only field
that has different name is transact_time field which is called time in the [Link] struct - we
can fix that separetely:
Chapter 4 - Mock the Binance API 45

1 # /apps/binance_mock/lib/binance_mock.ex
2 defp convert_order_to_order_response(%[Link]{} = order) do
3 %{
4 struct(
5 [Link],
6 order |> Map.to_list()
7 )
8 | transact_time: [Link]
9 }
10 end

Last function to finish support for placing buy and sell orders is to add a callback that will iterate
the fake order id and return it:

1 # /apps/binance_mock/lib/binance_mock.ex
2 def handle_call(
3 :generate_id,
4 _from,
5 %State{fake_order_id: id} = state
6 ) do
7 {:reply, id + 1, %{state | fake_order_id: id + 1}}
8 end

Implement order retrival


We can now move on to retrieving the orders. First, we need to add an interface function that will
call our BinanceMock genserver:

1 # /apps/binance_mock/lib/binance_mock.ex
2 def get_order(symbol, time, order_id) do
3 [Link](
4 __MODULE__,
5 {:get_order, symbol, time, order_id}
6 )
7 end

The callback itself is pretty straightforward. We will need to get order book for passed symbol. As
we don’t know the order’s side, we will concat all 3 lists(buy_side, sell_side and historical) and try
to find an order that will
match passed symbol, time and order_id:
Chapter 4 - Mock the Binance API 46

1 # /apps/binance_mock/lib/binance_mock.ex
2 def handle_call(
3 {:get_order, symbol, time, order_id},
4 _from,
5 %State{order_books: order_books} = state
6 ) do
7 order_book =
8 [Link](
9 order_books,
10 :"#{symbol}",
11 %OrderBook{}
12 )
13
14 result =
15 (order_book.buy_side ++
16 order_book.sell_side ++
17 order_book.historical)
18 |> [Link](
19 &(&[Link] == symbol and
20 &[Link] == time and
21 &1.order_id == order_id)
22 )
23
24 {:reply, {:ok, result}, state}
25 end

Implement callback for incoming trade events


Finally, we need to handle incoming trade events(streamed from the PubSub topic). We need to
implement a callback that will:

• get the order book for the symbol from the trade event
• use the take_while/2 function on the buy orders with prices that are greater than the current
price - we can update their status to filled.
• use the take_while/2 function again, this time to sell orders with prices less than the current
price, we will also update their statuses to filled.
• concat both lists of filled orders, convert them to trade events and broadcast them to the
PubSub’s trade_events topic.
• remove the filled orders from buy and sell lists and put them into the historical list.

Here we can clearly see the benefit of sorting the lists, we can use functions like take_while/2 and
drop/2 instead of filter/2
Chapter 4 - Mock the Binance API 47

and reject/2(later ones will go through whole lists which could become a bottleneck when multiple
open orders would be active):

1 # /apps/binance_mock/lib/binance_mock.ex
2 def handle_info(
3 %[Link]{} = trade_event,
4 %{order_books: order_books} = state
5 ) do
6 order_book =
7 [Link](
8 order_books,
9 :"#{trade_event.symbol}",
10 %OrderBook{}
11 )
12
13 filled_buy_orders =
14 order_book.buy_side
15 |> Enum.take_while(&[Link]?([Link](trade_event.price), [Link](&[Link])))
16 |> [Link](&[Link]!(&1, :status, "FILLED"))
17
18 filled_sell_orders =
19 order_book.sell_side
20 |> Enum.take_while(&[Link]?([Link](trade_event.price), [Link](&[Link])))
21 |> [Link](&[Link]!(&1, :status, "FILLED"))
22
23 (filled_buy_orders ++ filled_sell_orders)
24 |> [Link](&convert_order_to_event(&1, trade_event.event_time))
25 |> [Link](&broadcast_trade_event/1)
26
27 remaining_buy_orders =
28 order_book.buy_side
29 |> [Link](length(filled_buy_orders))
30
31 remaining_sell_orders =
32 order_book.sell_side
33 |> [Link](length(filled_sell_orders))
34
35 order_books =
36 [Link]!(
37 order_books,
38 :"#{trade_event.symbol}",
39 %{
40 buy_side: remaining_buy_orders,
Chapter 4 - Mock the Binance API 48

41 sell_side: remaining_sell_orders,
42 historical:
43 filled_buy_orders ++
44 filled_sell_orders ++
45 order_book.historical
46 }
47 )
48
49 {:noreply, %{state | order_books: order_books}}
50 end

Inside the callback we referred to two new functions that we will implement now(convert_order_-
to_event and broadcast_trade_event).

Starting with the convert_order_to_event function, it will simply return a new [Link]
struct filled with data. An interesting thing to observe here is that again all values are predicatable
and function will return the same values for the same input - this will become beneficial for
backtesting over and over again and comparing the behaviour between runs:

1 # /apps/binance_mock/lib/binance_mock.ex
2 defp convert_order_to_event(%[Link]{} = order, time) do
3 %[Link]{
4 event_type: [Link],
5 event_time: time - 1,
6 symbol: [Link],
7 trade_id: Integer.floor_div(time, 1000),
8 price: [Link],
9 quantity: order.orig_qty,
10 buyer_order_id: order.order_id,
11 seller_order_id: order.order_id,
12 trade_time: time - 1,
13 buyer_market_maker: false
14 }
15 end

Broadcasting trade event to PubSub will be the last function that will finish
the implementation of BinanceMock for now. It’s important to upcase the symbol as we want to be
sure that we will match the topic name which is case-sensitive:
Chapter 4 - Mock the Binance API 49

1 # /apps/binance_mock/lib/binance_mock.ex
2 defp broadcast_trade_event(%[Link]{} = trade_event) do
3 symbol = [Link](trade_event.symbol)
4
5 [Link](
6 [Link],
7 "trade_events:#{symbol}",
8 trade_event
9 )
10 end

That finishes the BinanceMock implementation. Now, we need to add it to


the children list of the application so it starts automatically:

1 # /apps/binance_mock/lib/binance_mock/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {BinanceMock, []}
6 ]
7 ...
8 end
9 end

Upgrade trader and config


We can move on to the [Link] module where we will add an attribute which will point to
the Binance client dictated by config:

1 # /apps/naive/lib/naive/[Link]
2 @binance_client Application.get_env(:naive, :binance_client)

We need to replace all direct calls to the Binance module for calls to the @binance_client attribute
inside the [Link]:
Chapter 4 - Mock the Binance API 50

1 # /apps/naive/lib/naive/[Link]
2
3 ...
4 @binance_client.order_limit_buy(
5 ...
6 @binance_client.order_limit_sell
7 ...
8 @binance_client.get_exchange_info()
9 ...

As the [Link] is now relying on the config to specify which Binance client should they use,
we need to add it to the config:

1 # /config/[Link]
2
3 config :naive,
4 binance_client: BinanceMock

Last modification to our system will be to modify the [Link] of the binance_mock app to list all
deps required for it to work:

1 # /apps/binance_mock/[Link]
2 ...
3 defp deps do
4 [
5 {:binance, "~> 0.7.1"},
6 {:decimal, "~> 2.0"},
7 {:phoenix_pubsub, "~> 2.0"},
8 {:streamer, in_umbrella: true}
9 ]
10 end
11 ...

We also add :binance_mock to the list of deps of the naive app(as the Naive app will use either
Binance or BinanceMock to “trade”):
Chapter 4 - Mock the Binance API 51

1 # /apps/naive/[Link]
2 ...
3 defp deps do
4 [
5 ...
6 {:binance_mock, in_umbrella: true}
7 ...
8 ]
9 end
10 ...

Test the implementation


We can now see the BinanceMock in action. First, we will start an iex session and double check that
BinanceMock process is alive.

1 $ iex -S mix
2 ...
3 iex(1)> [Link](BinanceMock)
4 #PID<0.320.0> # <- confirms that BinanceMock process is alive
5 iex(2)> Streamer.start_streaming("xrpusdt")
6 {:ok, #PID<0.332.0>}
7 iex(3)> [Link].start_link(%{symbol: "XRPUSDT", profit_interval: [Link]("-\
8 0.001")})
9 [Link].232 [info] Initializing new trader for XRPUSDT
10 {:ok, #PID<0.318.0>}
11 [Link].826 [info] Placing BUY order for XRPUSDT @ 0.29520000, quantity: 100
12 [Link].569 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.29549), qu\
13 antity: 100.0
14 [Link].391 [info] Trade finished, trader will now exit

As config already points to it so we can continue as previously by starting the streaming and trading
on the symbol. The trader is using the BinanceMock and it looks like everything works as it would
be dealing with a real exchange.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁵
²⁵[Link]
Chapter 5 - Enable parallel trading on
multiple symbols
Objectives
• design supervision tree that will allow trading using multiple traders in parallel per symbol
• update application supervisor
• implement [Link]
• implement [Link]

Introduction - architectural design


In the second chapter we implemented a basic trader which goes through the trading cycle. Inside
iEx session we were starting the [Link] process using start_link/1 function:

IEx session starting the Trader

GenServer.start_link/3 creates a link between IEx’s process and new [Link] process.
Whenever trader terminates(either finishes the trading cycle or there was an error), new one won’t
get started as there’s no supervision at all.
We can do much better than that with a a little bit of help from Elixir and OTP.
Chapter 5 - Enable parallel trading on multiple symbols 53

Let’s introduce a supervisor above our trader process. It will start a new trader process whenever
previous one finished/crashed:

Supervisor starting the Trader

This looks much better but there are few problems with it. So, when the trader will start to place
orders it will be in some state(it will hold buy/sell orders) that the supervisor won’t be aware of. In
case of trader crashing, the supervisor will start a new trader without any knowledge of possibly
placed orders or any other information from the state(it will be started with a “fresh” state).
To fix that we need to keep a copy of the trader’s state outside of the trader process - that’s why we
will introduce a new server called [Link] that will keep track of traders’ data:

Leader -> Supervisor -> Trader

The [Link] will become the interface to start new traders. It will call the start_child/1
function of the Supervisor, then consequently DynamicTraderSupervisor will call the start_link/1
function of our [Link] module.
We can also see that our [Link]’s are now started with the temporary restart option. Setting
this option will disallow the Supervisor from restarting the traders on its own. The responsibility of
restarting traders will now be shifted to the leader. Leader will monitor the traders and restart them
to a correct state when any crashes.
As trader state will get updated, it will notify the leader about it’s new state to be stored. This way
Chapter 5 - Enable parallel trading on multiple symbols 54

whenever trader would crash, leader will be able to start new trader process with last known state.
This setup will also allow us to start and supervise multiple traders for a single symbol which our
naive strategy will require in the future(next chapter).
For each symbol that we will be trading on we need a above trio of services(Leader + Dy-
namicTraderSupervisor + Trader), to effectively initialize(and supervise) them we will add an
[Link] that will start both [Link] and ‘[Link]:

Symbol supervisor added

We will need multiple symbol supervisors, one for each symbol that we would like to trade on. As
with traders, they will be dynamically started on demand, this should give us a hint that we need
another dynamic supervisor that will supervise symbol supervisors and will be direct child of our
[Link]([Link] module):
Chapter 5 - Enable parallel trading on multiple symbols 55

Full supervision tree

You could ask yourself why we don’t need some additional server to track which symbols are traded
at the moment (in the same way as [Link] tracks [Link]). The answer is that we don’t
need to track them as we register all [Link] with a name containing symbol that
they trade on. This way we will always be able to refer to them by registered name instead of
pids/refs.
Here’s what happens starting from the top of the graph:

• the [Link] is our top-level application’s supervisor for the naive app, it was auto-
generated as a part of the naive app
• it has a single child [Link], which has strategy one_for_one and all
of it’s children are [Link]
Chapter 5 - Enable parallel trading on multiple symbols 56

• [Link] process will start 2 further children: the [Link] and DynamicTraderSupervisor
both created on init
• the [Link] will ask DynamicTraderSupervisor to start the [Link] child pro-
cess(es)

This can be a little bit confusing at the moment but it will get a lot easier
as we will write the code. Let’s get to it!

Update application supervisor


Let’s start by adding a [Link] and a server to the children list of the
[Link] supervisor:

1 # /apps/naive/lib/naive/[Link]
2 def start(_type, _args) do
3 children = [
4 {
5 DynamicSupervisor,
6 strategy: :one_for_one,
7 name: [Link]
8 }
9 ]
10
11 ...
12 end

Add interface method


We will now add an interface method to the Naive module that will instruct [Link]
to start [Link](to be implemented next) as it’s child:

1 # /apps/naive/lib/[Link]
2 def start_trading(symbol) do
3 symbol = [Link](symbol)
4
5 {:ok, _pid} =
6 DynamicSupervisor.start_child(
7 [Link],
8 {[Link], symbol}
9 )
10 end
Chapter 5 - Enable parallel trading on multiple symbols 57

Implement [Link]
Next, time for the [Link], first step will be to create a file called symbol_-
[Link] inside apps/naive/lib/naive directory. There’s no point of using the DynamicSu-
pervisor²⁶, as we know the children that we would like to start automatically on init. This is a
full implementation of the supervisor and it’s a simple as just listing child processes inside the init
function:

1 # /apps/naive/lib/naive/symbol_supervisor.ex
2 defmodule [Link] do
3 use Supervisor
4
5 require Logger
6
7 def start_link(symbol) do
8 Supervisor.start_link(
9 __MODULE__,
10 symbol,
11 name: :"#{__MODULE__}-#{symbol}"
12 )
13 end
14
15 def init(symbol) do
16 [Link]("Starting new supervision tree to trade on #{symbol}")
17
18 [Link](
19 [
20 {
21 DynamicSupervisor,
22 strategy: :one_for_one,
23 name: :"[Link]-#{symbol}"
24 },
25 {[Link], symbol}
26 ],
27 strategy: :one_for_all
28 )
29 end
30 end

It’s advised to keep supervisor processes slim.


²⁶[Link]
Chapter 5 - Enable parallel trading on multiple symbols 58

We registered the [Link] processes with names, which will help us understand
the supervision tree inside the observer GUI(it will also allow us to stop those supervisors in the
future).
As mentioned previously whenever either the [Link] or [Link]-#{symbol}
would crash we would like to kill the other child process as we won’t be able to recover the state -
it’s just easier to init both again.

Implement [Link]
It’s time for the [Link] module, again, first step will be to create a file called [Link] inside
apps/naive/lib/naive directory. At this moment it will be a skeleton GenServer implementation
just to get the code to compile:

1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use GenServer
4
5 def start_link(symbol) do
6 GenServer.start_link(
7 __MODULE__,
8 symbol,
9 name: :"#{__MODULE__}-#{symbol}"
10 )
11 end
12
13 def init(symbol) do
14 {:ok, %{symbol: symbol}}
15 end
16 end

At this moment we have half of the supervision tree working so we can give it
a spin in iex. Using the observer we will be able to see all processes created when start trading
function gets called:

1 $ iex -S mix
2 ...
3 iex(1)> :[Link]()

Above function will open a new window looking as follows:


Chapter 5 - Enable parallel trading on multiple symbols 59

New observer window

To clearly see the supervision tree we will click on “Applications” tab at the top - the following tree
of processes will be shown on the left:

Observer applications list

If any other process tree is visible, go to the list on the left and select the naive application.
The [Link] is our [Link] module(you can confirm that by checking the name
option send to the start_link function inside the module). It start the [Link].
We can now call the Naive.start_trading/1 function couple time to see how the tree will look like
with additional processes(go back to the iex session):
Chapter 5 - Enable parallel trading on multiple symbols 60

1 ...
2 iex(2)> Naive.start_trading("adausdt")
3 [Link].974 [info] Starting new supervision tree to trade on ADAUSDT
4 {:ok, #PID<0.340.0>}
5 iex(3)> Naive.start_trading("xrpusdt")
6 [Link].117 [info] Starting new supervision tree to trade on XRPUSDT
7 {:ok, #PID<0.345.0>}

We can see that two new branches were created:

• SymbolSupervisor-ADAUSDT
• SymbolSupervisor-XRPUSDT

Each of them contain a [Link] and DynamicTraderSupervisor.

Updating the leader module


Let’s jump back to extending a leader implementation to get those traders running.
We will introduce a leader’s state that will consist of symbol, setting and a list of traders’ data. Trader
data will hold pid, ref and state of the trader:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 alias [Link]
4
5 require Logger
6
7 @binance_client Application.get_env(:naive, :binance_client)
8
9 defmodule State do
10 defstruct symbol: nil,
11 settings: nil,
12 traders: []
13 end
14
15 defmodule TraderData do
16 defstruct pid: nil,
17 ref: nil,
18 state: nil
19 end

We will use a handle_continue callback which was introduced in Erlang 21 to


initialize the leader asynchronously. To do that we will return a tuple starting with a :continue
atom from inside the init function:
Chapter 5 - Enable parallel trading on multiple symbols 61

1 # /apps/naive/lib/naive/[Link]
2 def init(symbol) do
3 {:ok,
4 %State{
5 symbol: symbol
6 }, {:continue, :start_traders}}
7 end

The [Link] will fetch symbol settings and based on them, it will build the state for traders
so they don’t need to fetch the same settings again. It will also start as many traders there were set
under chunks key in setting:

1 # /apps/naive/lib/naive/[Link]
2 # below init()
3 def handle_continue(:start_traders, %{symbol: symbol} = state) do
4 settings = fetch_symbol_settings(symbol)
5 trader_state = fresh_trader_state(settings)
6 traders = for _i <- 1..[Link],
7 do: start_new_trader(trader_state)
8
9 {:noreply, %{state | settings: settings, traders: traders}}
10 end

Fetching symbol settings will be hardcoded for time being to keep this chapter focused. We will also
move the code responsible for fetching tick
size from the [Link] to the [Link] and hardcode the rest of the values:

1 # /apps/naive/lib/naive/[Link]
2 defp fetch_symbol_settings(symbol) do
3 tick_size = fetch_tick_size(symbol)
4
5 %{
6 symbol: symbol,
7 chunks: 1,
8 # -0.12% for quick testing
9 profit_interval: [Link]("-0.0012"),
10 tick_size: tick_size
11 }
12 end
13
14 defp fetch_tick_size(symbol) do
15 @binance_client.get_exchange_info()
Chapter 5 - Enable parallel trading on multiple symbols 62

16 |> elem(1)
17 |> [Link](:symbols)
18 |> [Link](&(&1["symbol"] == symbol))
19 |> [Link]("filters")
20 |> [Link](&(&1["filterType"] == "PRICE_FILTER"))
21 |> [Link]("tickSize")
22 |> [Link]()
23 end

Additionally we need to create a helper method that we used inside the handle_continue/2 callback
called fresh_trader_state/1:

1 # /apps/naive/lib/naive/[Link]
2 # place this one above the `fetch_symbol_settings` function
3 defp fresh_trader_state(settings) do
4 struct([Link], settings)
5 end

Starting a new trader isn’t any different from the code that we already wrote to start a new
[Link]. We need to call the DynamicSupervisor.start_child/2 function and start
to monitor the process:

1 # /apps/naive/lib/naive/[Link]
2 defp start_new_trader(%[Link]{} = state) do
3 {:ok, pid} =
4 DynamicSupervisor.start_child(
5 :"[Link]-#{[Link]}",
6 {[Link], state}
7 )
8
9 ref = [Link](pid)
10
11 %TraderData{pid: pid, ref: ref, state: state}
12 end

Updating the [Link] module


Now we can update the [Link], first, we will set restart to be temporary to avoid restarting
it by the [Link]:
Chapter 5 - Enable parallel trading on multiple symbols 63

1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use GenServer, restart: :temporary
4 ...

Next, we will update start_link/1 and init/1 to take the state instead of building it from args:

1 # /apps/naive/lib/naive/[Link]
2 def start_link(%State{} = state) do
3 GenServer.start_link(__MODULE__, state)
4 end
5
6 def init(%State{symbol: symbol} = state) do
7 symbol = [Link](symbol)
8
9 [Link]("Initializing new trader for symbol(#{symbol})")
10
11 [Link](
12 [Link],
13 "trade_events:#{symbol}"
14 )
15
16 {:ok, state}
17 end

Next, we need to update two handle_info/2 callbacks that change the state of the [Link]
process(when placing buy order and when placing sell order). They will need to notify the
[Link] that state is changed before returning it:

1 # /apps/naive/lib/naive/[Link]
2 ...
3
4 def handle_info(
5 ...
6 ) do
7 [Link]("Placing buy order (#{symbol}@#{price})")
8 ...
9 new_state = %{state | buy_order: order}
10 [Link](:trader_state_updated, new_state)
11 {:noreply, new_state}
12 end
13
Chapter 5 - Enable parallel trading on multiple symbols 64

14 def handle_info(
15 ...
16 ) do
17 ...
18 [Link]("Buy order filled, placing sell order ...")
19 ...
20
21 new_state = %{state | sell_order: order}
22 [Link](:trader_state_updated, new_state)
23 {:noreply, new_state}
24 end
25 ...

Finalizing [Link] implementation


Now we need to get back to the [Link] where we will implement the notifing logic. We will
start with the notify function that will just call the [Link] process:

1 # /apps/naive/lib/naive/[Link]
2 # below init
3
4 def notify(:trader_state_updated, trader_state) do
5 [Link](
6 :"#{__MODULE__}-#{trader_state.symbol}",
7 {:update_trader_state, trader_state}
8 )
9 end

Now, it’s time for a callback function that will handle the trader state update. As this is a handle_-
call/3 callback we have access to trader pid which sent the notification message. We will try to find
that trader in the list of traders. If that’s successful we will update the cached state for that
trader locally:

1 # /apps/naive/lib/naive/[Link]
2 # below handle_continue
3 def handle_call(
4 {:update_trader_state, new_trader_state},
5 {trader_pid, _},
6 %{traders: traders} = state
7 ) do
8 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
9 nil ->
Chapter 5 - Enable parallel trading on multiple symbols 65

10 [Link](
11 "Tried to update the state of trader that leader is not aware of"
12 )
13 {:reply, :ok, state}
14
15 index ->
16 old_trader_data = [Link](traders, index)
17 new_trader_data = %{old_trader_data | :state => new_trader_state}
18
19 {:reply, :ok, %{state | :traders =>
20 List.replace_at(traders, index, new_trader_data)}}
21 end
22 end

Another callback functions that we will need to provide are two handle_info/2 functions that will
handle the trade finished scenario as well as crashed trader.
First, trade finished scenario. As previously, we will try to find the trader data in the traders list. If
that’s successful, we will start a new trader with fresh state. We will also overwrite existing trader
data locally(as pid, ref and state changed):

1 # /apps/naive/lib/naive/[Link]
2 # below state updated handle_call callback
3 def handle_info(
4 {:DOWN, _ref, :process, trader_pid, :normal},
5 %{traders: traders, symbol: symbol, settings: settings} = state
6 ) do
7 [Link]("#{symbol} trader finished trade - restarting")
8
9 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
10 nil ->
11 [Link](
12 "Tried to restart finished #{symbol} " <>
13 "trader that leader is not aware of"
14 )
15
16 {:noreply, state}
17
18 index ->
19 new_trader_data = start_new_trader(fresh_trader_state(settings))
20 new_traders = List.replace_at(traders, index, new_trader_data)
21
22 {:noreply, %{state | traders: new_traders}}
Chapter 5 - Enable parallel trading on multiple symbols 66

23 end
24 end

Here we will assume that whenever the reason that the [Link] process died is :normal that
means that we stopped it after trade cycle finished.
Final callback that we need to provide will handle the scenario where the trader crashed. We would
like to find the cached state of the crashed trader and start a new one with the same state and then
update the local cache as pid and ref will change for that trader:

1 # /apps/naive/lib/naive/[Link]
2 # below trade finished handle_info callback
3 def handle_info(
4 {:DOWN, _ref, :process, trader_pid, _reason},
5 %{traders: traders, symbol: symbol} = state
6 ) do
7 [Link]("#{symbol} trader died - trying to restart")
8
9 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
10 nil ->
11 [Link](
12 "Tried to restart #{symbol} trader " <>
13 "but failed to find its cached state"
14 )
15
16 {:noreply, state}
17
18 index ->
19 trader_data = [Link](traders, index)
20 new_trader_data = start_new_trader(trader_data.state)
21 new_traders = List.replace_at(traders, index, new_trader_data)
22
23 {:noreply, %{state | traders: new_traders}}
24 end
25 end

IEx testing
That finishes the implementation part, let’s jump into iex session to see how it works.
We will start the observer first, then we will start trading on any valid symbol.
When our trader will start, you should be able to right click and select “Kill process”(leave the reason
as kill) and click “OK”. At that moment you should see that the PID of the trader changed and we
can also see log message from the leader.
Chapter 5 - Enable parallel trading on multiple symbols 67

1 $ iex -S mix
2 ...
3 iex(1)> :[Link]()
4 :ok
5 iex(2)> Naive.start_trading("xrpusdt")
6
7 [Link].041 [info] Starting new supervision tree to trade on XRPUSDT
8 {:ok, #PID<0.455.0>}
9 [Link].697 [info] Initializing new trader for XRPUSDT
10 iex(3)>
11 [Link].476 [error] XRPUSDT trader died - trying to restart
12 [Link].476 [info] Initializing new trader for XRPUSDT

[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁷
²⁷[Link]
Chapter 6 - Introduce a
buy_down_interval to make a single
trader more profitable
Objectives
• present reasons why to introduce buy_down_interval
• add buy_down interval to [Link]’s state and calculate buy price
• add buy_down interval to [Link]’s state compiled by the [Link]
• manually test the implementation inside iex
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 69

Why we need to buy below the current price? Feature


overview

Current silly buy price

The [Link] process(marked in above diagram with blue color) at the arrival of the first trade
event, immediately places a buy order at the current price. At the moment when buy order gets
filled, it places sell order which later also gets filled.
The Trader A exits and a new trader B is started which again immediately places a buy order at
the same price as the previous trader just sold. When this gets filled sell order get’s placed and loop
continues on and on.
We can see that there’s a problem here as we just paid a fee twice(once for selling by the Trader A
and once for buying by the Trader B) without really gaining anything(the Trader A could just hold
the currency and could simply cash in on double profit in this specific situation).
The solution is to be more clever about our buy order’s price. The idea is simple, instead of placing a
new buy order at the current price(price from the last TradeEvent), we will introduce a buy_down_-
interval:
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 70

Buy price reduced by buy_down_interval

So every new [Link] process as it receives first trade event, trader will take it’s price and
will calculate an decreased price by using the buy_down_interval value(for example 0.005 would be
0.5%) and place a buy order at that calculated price.
When looking at the chart above we can figure out that buy_down_interval should never be smaller
than double the fee(at the moment of writting transaction fee is 0.1%) that you are paying per
transaction.

[Link] implementation

Let’s open the [Link] module’s file(/apps/naive/lib/naive/[Link]) and add buy_down_-


interval to it’s state:
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 71

1 # /apps/naive/lib/naive/[Link]
2 ...
3 defmodule State do
4 @enforce_keys [
5 :symbol,
6 :buy_down_interval, # <= add this line
7 :profit_interval,
8 :tick_size
9 ]
10 defstruct [
11 :symbol,
12 :buy_order,
13 :sell_order,
14 :buy_down_interval, # <= add this line
15 :profit_interval,
16 :tick_size
17 ]
18 end
19 ...

Next, we need to update the initial handle_info/2 callback which places the buy order. We need
to retrieve the buy_down_interval and the tick_size from the state of the trader to be able to
calculate the buy price. We will put the logic to calculate that price in a separate function at the end
of the file:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 def handle_info(
4 %TradeEvent{price: price},
5 %State{
6 symbol: symbol,
7 buy_order: nil,
8 buy_down_interval: buy_down_interval, # <= add this line
9 tick_size: tick_size # <= add this line
10 } = state
11 ) do
12 price = calculate_buy_price(price, buy_down_interval, tick_size)
13 # ^ add above call
14 ...

To calculate the buy price we will use a very similar method to the one used
before to calculate the sell price. First, we will need to cast all variables
into the Decimal structs and then, we will simply subtract the buy_down_interval of the price from
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 72

the price. The number that we will end up with won’t necessarily be a legal price as every price
needs to be divisible by the tick_size which we will assure in the last calculation:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 defp calculate_buy_price(price, buy_down_interval, tick_size) do
4 current_price = [Link](price)
5
6 # not necessarily legal price
7 exact_buy_price =
8 [Link](
9 current_price,
10 [Link](current_price, buy_down_interval)
11 )
12
13 D.to_float(
14 [Link](
15 D.div_int(exact_buy_price, tick_size),
16 tick_size
17 )
18 )
19 end
20 ...

[Link] implementation

Next we need to update the [Link] as it needs to add buy_down_interval to the [Link]’s
state:

1 # /apps/naive/lib/naive/[Link]
2 defp fetch_symbol_settings(symbol) do
3 ...
4
5 %{
6 chunks: 1,
7 buy_down_interval: [Link]("0.0001"), # <= add this line
8 # -0.12% for quick testing
9 profit_interval: [Link]("-0.0012"),
10 tick_size: tick_size
11 }
12 end
13 ...
Chapter 6 - Introduce a buy_down_interval to make a single trader more profitable 73

IEx testing
That finishes the buy_down_interval implementation, we will jump into iex session to see how it
works, but before that for a moment we will change the logging level to debug to see current prices:

1 # config/[Link]
2 ...
3 config :logger,
4 level: :debug # <= updated for our manual test
5 ...

After starting the streaming we should start seeing log messages with current prices. As we updated
our implementation we should place our buy order below current price as it’s visible below:

1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("FLMUSDT")
4 {:ok, #PID<0.313.0>}
5 iex(2)> Naive.start_trading("FLMUSDT")
6 [Link].829 [info] Starting new supervision tree to trade on FLMUSDT
7 ...
8 [Link].755 [info] Initializing new trader for FLMUSDT
9 ...
10 [Link].000 [debug] Trade event received [email protected]
11 [Link].009 [info] Placing BUY order for FLMUSDT @ 0.1517, quantity: 100

As we can see our [Link] process placed an buy order below the current price (based on most
recent trade event received)
[Note] Please remember to revert the change to logger level as otherwise there’s too much noise in
the logs.
[Note 2] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁸
²⁸[Link]
Chapter 7 - Introduce a trader budget
and calculating the quantity
Objectives
• fetch step_size
• append budget and step_size to the Trader’s state compiled by the Leader
• append budget and step_size to the Trader’s state
• calculate quantity

Fetch step_size
In 2nd chapter we hardcoded quantity to 100, it’s time to refactor that. We will need step_size
information from the Binance which we are
already retrieving together with tick_size in the exchangeInfo call(but not getting it out from the
response). So we will rename the fetch_tick_size/1 function to fetch_symbol_filters/1 which
will allow us to return multiple filters(tick_size and step_size) from that function.

1 # /apps/naive/lib/naive/[Link]
2 ...
3 defp fetch_symbol_settings(symbol) do
4 symbol_filters = fetch_symbol_filters(symbol) # <= updated fetch_tick_size
5
6 [Link](
7 %{
8 chunks: 1,
9 budget: 20,
10 buy_down_interval: [Link]("0.0001"),
11 # -0.12% for quick testing
12 profit_interval: [Link]("-0.0012")
13 },
14 symbol_filters
15 )
16 end
17
18 defp fetch_symbol_filters(symbol) do # <= updated fetch_tick_size
Chapter 7 - Introduce a trader budget and calculating the quantity 75

19 symbol_filters =
20 @binance_client.get_exchange_info()
21 |> elem(1)
22 |> [Link](:symbols)
23 |> [Link](&(&1["symbol"] == symbol))
24 |> [Link]("filters")
25
26 tick_size =
27 symbol_filters
28 |> [Link](&(&1["filterType"] == "PRICE_FILTER"))
29 |> [Link]("tickSize")
30 |> [Link]()
31
32 step_size =
33 symbol_filters
34 |> [Link](&(&1["filterType"] == "LOT_SIZE"))
35 |> [Link]("stepSize")
36 |> [Link]()
37
38 %{
39 tick_size: tick_size,
40 step_size: step_size
41 }
42 end

Instead of reassigning the filters one by one into the settings, we will merge them together(#1).
Additionally, we will introduce a budget(#2) which will be shared across all traders of the symbol.
Also, we don’t need to assign tick_size here as it’s part of the settings that are merged.

Append budget and step_size to the Trader’s state inside


the Leader
The budget needs to be added to the %State{}(step_size will be automatically passed on by
struct/2) of the trader inside fresh_trader_state/1(where we initialize the state of traders). Before
we will assign it we need to divide it by number of chunks as each trader gets only a chunk of the
budget:
Chapter 7 - Introduce a trader budget and calculating the quantity 76

1 # /apps/naive/lib/naive/[Link]
2 defp fresh_trader_state(settings) do
3 %{
4 struct([Link], settings) |
5 budget: [Link]([Link], [Link])
6 }
7 end

In the code above we are using the Decimal module(aliased as D) to calculate the budget - we need
to alias it at the top of [Link]’s file:

1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use GenServer
4
5 alias Decimal, as: D # <= add this line
6 alias [Link]
7 ...

Append budget and step_size to the Trader’s state


We need to add both budget and step_size to the [Link]’s state struct:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 defmodule State do
4 @enforce_keys [
5 :symbol,
6 :budget, # <= add this line
7 :buy_down_interval,
8 :profit_interval,
9 :tick_size,
10 :step_size # <= add this line and comma above
11 ]
12 defstruct [
13 :symbol,
14 :budget, # <= add this line
15 :buy_order,
16 :sell_order,
17 :buy_down_interval,
18 :profit_interval,
Chapter 7 - Introduce a trader budget and calculating the quantity 77

19 :tick_size,
20 :step_size # <= add this line and comma above
21 ]
22 end
23 ...

Calculate quantity
Jumping back to the handle_info/2 where the [Link] places a buy order, we need to pattern
match on the step_size and budget then we will be able to swap hardcoded quantity with result of
calling the calculate_quantity/3 function:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 def handle_info(
4 %TradeEvent{price: price},
5 %State{
6 symbol: symbol,
7 budget: budget, # <= add this line
8 buy_order: nil,
9 buy_down_interval: buy_down_interval,
10 tick_size: tick_size,
11 step_size: step_size # <= add this line
12 } = state
13 ) do
14 ...
15 quantity = calculate_quantity(budget, price, step_size)
16 ...

To calculate quantity we will just divide the budget by the price with a caveat that it’s possible (as
with calculating the price) that it’s not a legal quantity value as it needs to be divisiable by step_size:

1 # /apps/naive/lib/naive/[Link]
2 # add below at the bottom of the file
3 ...
4 defp calculate_quantity(budget, price, step_size) do
5 price = D.from_float(price)
6
7 # not necessarily legal quantity
8 exact_target_quantity = [Link](budget, price)
9
Chapter 7 - Introduce a trader budget and calculating the quantity 78

10 D.to_float(
11 [Link](
12 D.div_int(exact_target_quantity, step),
13 step
14 )
15 )
16 end

IEx testing
That finishes the quantity(and budget) implementation, we will jump into iex session to see how it
works.
First, start the streaming and trading on the same symbol and moment later you should see variable
amount of quantity that more or less uses full allowed budget:

1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("XRPUSDT")
4 {:ok, #PID<0.313.0>}
5 iex(2)> Naive.start_trading("XRPUSDT")
6 [Link].829 [info] Starting new supervision tree to trade on XRPUSDT
7 [Link].755 [info] Initializing new trader for XRPUSDT
8 [Link].009 [info] Placing BUY order for XRPUSDT @ 0.29506, quantity: 67.7
9 [Link].456 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.29529, qua\
10 ntity: 67.7

As we can see our [Link] process is now buying and selling based on passed budget.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github²⁹
²⁹[Link]
Chapter 8 - Add support for multiple
transactions per order
Objectives
• describe issue with current implementation
• improve buy order filled callback
• implement buy order “filled” callback
• improve sell order callback

Issue with the current implementation


Currently, [Link] process is placing a buy order and it’s assuming that it will be filled by a
single opposite sell order(we are pattern matching on quantity to confirm that):

Single transaction fills order

Here we can see our buy order for 1000 units(on the left) and other trader’s sell order(on the right)
for 1000 units. This(order fully filled in single transaction) is a case most of the time but it’s not
ALWAYS the case.
Sometimes our order will be filled by two or more transactions:
Chapter 8 - Add support for multiple transactions per order 80

Multiple transactions fills order

The easiest and the safest way to check has this event filled our order fully is to fetch our order again
from Binance at the moment when trade event filling our order arrives.
Problem with this approach is that sometimes we will run into a race condition:

Race condition timeline - two sell orders

From the left, first, we are sending a buy order for quantity 1000 to the Binance. It hangs for a while
until it gets filled by 2 transactions that happened very quickly. Quickly enough for us to receive
both messages almost in the same moment.
When our bot will handle the first one it will fetch our the buy order which is already filled. It will
cause the trader to place a sell order but then the there’s another trade event waiting in the message
box. It will be handled by another callback that will again fetch order and place another sell order
to be placed and that’s obviously not correct.
Chapter 8 - Add support for multiple transactions per order 81

What we need to do is to update the status of the buy order after the first fetch(if it’s filled) so when
second trade event arrives we will ignore it(this will require additonal callback).
The same issue will appear when placing sell order and dealing multiple simultaneous transaction.

Improve buy order filled callback


First, we need to modify the callback which monitors incoming trade events for ones filling it’s buy
order and then places sell order. We need to remove pattern matching assuming that a single trade
event will fill our buy order - we need to drop quantity check as well as add

1 # /apps/naive/lib/naive/[Link]
2 def handle_info(
3 %TradeEvent{
4 buyer_order_id: order_id # <= quantity got removed from here
5 },
6 %State{
7 symbol: symbol,
8 buy_order:
9 %[Link]{
10 price: buy_price,
11 order_id: order_id,
12 orig_qty: quantity,
13 transact_time: timestamp # <= timestamp added to query order
14 } = buy_order, # <= buy order to update it
15 profit_interval: profit_interval,
16 tick_size: tick_size
17 } = state
18 ) do

Now we can fetch our buy order to check is it already filled. We will get the [Link] struct
instead of the [Link] that we normally deal with. At this moment we will simply
update our [Link] struct from state:
Chapter 8 - Add support for multiple transactions per order 82

1 # /apps/naive/lib/naive/[Link]
2 # inside the same callback
3 def handle_info(
4 ...
5 ) do
6 {:ok, %[Link]{} = current_buy_order} =
7 @binance_client.get_order(
8 symbol,
9 timestamp,
10 order_id
11 )
12
13 buy_order = %{buy_order | status: current_buy_order.status}
14 ...

The rest of the logic inside this callback will depend on the status of the buy order. If our buy
order is “filled” we would like to follow the existing logic but also update buy_order field inside the
state of the trader process. On the other hand if our order is not yet filled the only thing to do is to
update buy_order field inside the state of the Trader process. Here’s and updated body below above
changes(few variables got renamed for clarity as we are now fetch the order):

1 # /apps/naive/lib/naive/[Link]
2 # inside the same callback
3 buy_order = ....
4
5 {:ok, new_state} =
6 if buy_order.status == "FILLED" do
7 sell_price = calculate_sell_price(buy_price, profit_interval, tick_size)
8
9 [Link](
10 "Buy order filled, placing SELL order for " <>
11 "#{symbol} @ #{sell_price}, quantity: #{quantity}"
12 )
13
14 {:ok, %[Link]{} = order} =
15 @binance_client.order_limit_sell(symbol, quantity, sell_price, "GTC")
16
17 {:ok, %{state | buy_order: buy_order, sell_order: order}}
18 else
19 [Link]("Buy order partially filled")
20 {:ok, %{state | buy_order: buy_order}}
21 end
22
Chapter 8 - Add support for multiple transactions per order 83

23 [Link](:trader_state_updated, new_state)
24 {:noreply, new_state}
25 end

As we are branching our logic and both paths are updating the state, we will return it together with
an :ok atom to be able to pattern match it and assign as a new state.

Implement buy order “filled” callback


Above callback covers the case where we will get multiple transactions filling our buy order. We
aren’t yet convering for the race condition described at the begining of this chapter. When another
trade event matching buyer_order_id would arrive, above callback would be used and another sell
order would be placed. To avoid that we need to add a new callback ABOVE the one that we just
edited that will match buyer_order_id together with “filled” status and it will simply ignore that
trade event as we know that sell event needed to be placed by previous trade event:

1 # /apps/naive/lib/naive/[Link]
2 # place this callback ABOVE callback from previous section
3 def handle_info(
4 %[Link]{
5 buyer_order_id: order_id
6 },
7 %State{
8 buy_order: %[Link]{
9 order_id: order_id, # <= confirms that it's event for buy order
10 status: "FILLED" # <= confirms buy order filled
11 },
12 sell_order: %[Link]{} # <= confirms sell order placed
13 } = state
14 ) do
15 {:noreply, state}
16 end

Improve sell order callback


Let’s move on to the callback where the trader receives trade event matching sell order’s id (about
line 135 inside the [Link] module).
We need to modify the header of our callback in the following ways:

• drop the both pattern matches on quantity as we already know that trade event could partially
fill our order (#1)
Chapter 8 - Add support for multiple transactions per order 84

• get symbol out of state (#2)


• get transact_time out of the sell_order (used to fetch get_order) (#3)
• assign sell_order to a variable (#4)

1 # /apps/naive/lib/naive/[Link]
2 def handle_info(
3 %TradeEvent{
4 seller_order_id: order_id # `quantity` check removed below (#1)
5 },
6 %State{
7 symbol: symbol, (#2)
8 sell_order:
9 %[Link]{
10 order_id: order_id,
11 transact_time: timestamp # `transact_time` to `get_order` (#3)
12 } = sell_order # to update order (#4)
13 } = state
14 ) do

Moving lower to the body of the function, we need to:

• fetch current state of our sell order


• update status of our sell_order from Trader’s state
• branch out the logic based on status of the sell_order:
– log and return the :stop atom to stop the GenServer
or
– update the state with new sell_order and continue

Here’s the full body of our callback:

1 # /apps/naive/lib/naive/[Link]
2 # inside the callabck
3 {:ok, %[Link]{} = current_sell_order} =
4 @binance_client.get_order(
5 symbol,
6 timestamp,
7 order_id
8 )
9
10 sell_order = %{sell_order | status: current_sell_order.status}
11
Chapter 8 - Add support for multiple transactions per order 85

12 if sell_order.status == "FILLED" do
13 [Link]("Trade finished, trader will now exit")
14 {:stop, :normal, state}
15 else
16 [Link]("Sell order partially filled")
17 new_state = %{state | sell_order: sell_order}
18 {:noreply, new_state}
19 end

Test the implementation


Testing this feature is a bit tricky as it requires trading on real Binance exachnge(as our BinanceMock
always fills order with a single transaction) as well as race condition to happen :) Not that easy but
even without race condition we should still test that code works as expected wiht BinanceMock:

1 $ iex -S mix
2 ...
3 iex(1)> Naive.start_trading("XRPUSDT")
4 [Link].977 [info] Starting new supervision tree to trade on XRPUSDT
5 {:ok, #PID<0.331.0>}
6 [Link].073 [info] Initializing new trader for XRPUSDT
7 iex(2)> Streamer.start_streaming("XRPUSDT")
8 {:ok, #PID<0.345.0>}
9 [Link].044 [info] Initializing new trader for XRPUSDT
10 [Link].888 [info] Placing BUY order for XRPUSDT @ 0.28031, quantity: 71.3
11 [Link].023 [info] Buy order filled, placing SELL order for XRPUSDT @ 0.28053, qua\
12 ntity: 71.30000000
13 [Link].865 [info] Trade finished, trader will now exit
14 [Link].865 [info] XRPUSDT Trader finished - restarting

[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github³⁰
³⁰[Link]
Chapter 9 - Run multiple traders in
parallel
Objectives
• describe and design the required functionality
• implement rebuy in the [Link]
• implement rebuy in the [Link]
• improve logs by assigning ids to traders

Describe and design the required functionality


At this moment, inside the [Link] we have a silly code that
starts all of the traders at the same moment:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 traders =
4 for _i <- 1..[Link],
5 do: start_new_trader(trader_state)
6 ...

All the changes we made in this episode will enable us to fix.


Let’s say that we placed a buy order that got filled and the price fallen before reaching the sell level.
We can see here that we missed a nice opportunity to buy more as price drops and make money as
it climbs back:
Chapter 9 - Run multiple traders in parallel 87

Single trader

We will implement additional trade event callback inside the [Link] that will keep checking
the price after buy order has been filled. Whenever price drops below the buy_order’s price by
rebuy_interval we will notify the [Link] to start the new [Link] process:
Chapter 9 - Run multiple traders in parallel 88

Multiple traders

The [Link] keeps track of how many [Link] are running and needs to honor the
number of chunks set up in the settings (one chunk == one trader).
To stop the [Link] from continuously notifying about a drop in the price we will also
introduce a boolean flag that will track has the [Link] been already notified.

Implement rebuy inside [Link]


We will start by adding the rebuy_interval and the rebuy_notified to trader’s state:
Chapter 9 - Run multiple traders in parallel 89

1 # /apps/naive/lib/naive/[Link]
2 ...
3 defmodule State do
4 @enforce_keys [
5 :symbol,
6 :budget,
7 :buy_down_interval,
8 :profit_interval,
9 :rebuy_interval, # <= add this field
10 :rebuy_notified, # <= add this field
11 :tick_size,
12 :step_size
13 ]
14 defstruct [
15 :symbol,
16 :budget,
17 :buy_order,
18 :sell_order,
19 :buy_down_interval,
20 :profit_interval,
21 :rebuy_interval, # <= add this field
22 :rebuy_notified, # <= add this field
23 :tick_size,
24 :step_size
25 ]
26 end

Rebuy logic should be placed almost as last callback just before the one that ignores all events. We
will need to retrieve the current price and buy_price and confirm that we didn’t notify the leader
yet(rebuy_notified flag):

1 # /apps/naive/lib/naive/[Link]
2 ...
3 # sell filled callback here
4 ...
5 def handle_info(
6 %TradeEvent{
7 price: current_price
8 },
9 %State{
10 symbol: symbol,
11 buy_order: %[Link]{
12 price: buy_price
Chapter 9 - Run multiple traders in parallel 90

13 },
14 rebuy_interval: rebuy_interval,
15 rebuy_notified: false
16 } = state
17 ) do
18
19 end
20
21 # catch all callback here

We need to calculate is the current price below the rebuy interval. If yes we will notify the leader
and update the boolean flag. We will abstract calculation to separate function(for readability) that
we will write below:

1 # /apps/naive/lib/naive/[Link]
2 # body of the above callback
3 if trigger_rebuy?(buy_price, current_price, rebuy_interval) do
4 [Link]("Rebuy triggered for #{symbol} trader")
5 new_state = %{state | rebuy_notified: true}
6 [Link](:rebuy_triggered, new_state)
7 {:noreply, new_state}
8 else
9 {:noreply, state}
10 end

As mentioned before, we will set the rebuy_notified boolean flag to true and notify the leader using
the notify function with dedicated atom.
At the bottom of the module we need to add trigger_rebuy? helper function:

1 # /apps/naive/lib/naive/[Link]
2 # bottom of the module
3 defp trigger_rebuy?(buy_price, current_price, rebuy_interval) do
4 current_price = [Link](current_price)
5 buy_price = [Link](buy_price)
6
7 rebuy_price =
8 [Link](
9 buy_price,
10 [Link](buy_price, rebuy_interval)
11 )
12
13 [Link]?(current_price, rebuy_price)
14 end
Chapter 9 - Run multiple traders in parallel 91

Implement rebuy in the [Link]


Moving on to the [Link] module, we can get update starting of the traders automatically by
the leader to starting just one inside handle_continue:

1 # /apps/naive/lib/naive/[Link]
2 def handle_continue(:start_traders, %{symbol: symbol} = state) do
3 ...
4 traders = [start_new_trader(trader_state)] # <= updated part
5
6 ...
7 end

We will need to add a new clause of notify function that will handle the rebuy scenario:

1 # /apps/naive/lib/naive/[Link]
2 # add below current `notify` function
3 def notify(:rebuy_triggered, trader_state) do
4 [Link](
5 :"#{__MODULE__}-#{trader_state.symbol}",
6 {:rebuy_triggered, trader_state}
7 )
8 end

We need to add a new handle_call function that will start new traders only when there are still
chunks available(enforce the maximum number of parallel
traders) - let’s start with a header:

1 # /apps/naive/lib/naive/[Link]
2 # place this one after :update_trader_state handle_call
3 def handle_call(
4 {:rebuy_triggered, new_trader_state},
5 {trader_pid, _},
6 %{traders: traders, symbol: symbol, settings: settings} = state
7 ) do
8
9 end

There are few important details to make note of:

• we need trader’s pid to be able to find it details inside the list of traders
Chapter 9 - Run multiple traders in parallel 92

• we need settings to cofirm number of chunks to be able to limit maximum number of parallel
traders

Moving on to the body of our callback. As with other ones, we will check can we find trader inside
list of traders and based on that we will either start another one(if we didn’t reach the limit of
chunks) or ignore it:

1 # /apps/naive/lib/naive/[Link]
2 # body of our callback
3 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
4 nil ->
5 [Link]("Rebuy triggered by trader that leader is not aware of")
6 {:reply, :ok, state}
7
8 index ->
9 traders =
10 if [Link] == length(traders) do
11 [Link]("All traders already started for #{symbol}")
12 traders
13 else
14 [Link]("Starting new trader for #{symbol}")
15 [start_new_trader(fresh_trader_state(settings)) | traders]
16 end
17
18 old_trader_data = [Link](traders, index)
19 new_trader_data = %{old_trader_data | :state => new_trader_state}
20 new_traders = List.replace_at(traders, index, new_trader_data)
21
22 {:reply, :ok, %{state | :traders => new_traders}}
23 end

In above code we need to remember to update the state of the trader that triggered the rebuy inside
the traders list as well as add state of a new trader to that list.
As with other setting we will hardcode the rebuy_interval(inside the fetch_symbol_settings
function) and assign them to
trader’s state(inside the fresh_trader_state function):
Chapter 9 - Run multiple traders in parallel 93

1 # /apps/naive/lib/naive/[Link]
2 defp fresh_trader_state(settings) do
3 %{
4 struct([Link], settings)
5 | budget: [Link]([Link], [Link]),
6 rebuy_notified: false # <= add this line
7 }
8 end
9
10 defp fetch_symbol_settings(symbol) do
11 ...
12
13 [Link](
14 %{
15 ...
16 chunks: 5, # <= update to 5 parallel traders max
17 budget: [Link]("100"), # <= update this line
18 ...
19 profit_interval: [Link]("-0.0012"),
20 rebuy_interval: [Link]("0.001") # <= add this line
21 },
22 symbol_filters
23 )
24 end

We also update the chunks and the budget to allow our strategy to start up to 5 parallel traders with a
budget of 20 USDT each(100/5) as Binance has minimal order requirement at about $15(when using
the BinanceMock this doesn’t really matter).

Improve logs by assigning ids to traders


The final change will be to add an id to trader’s state so we can use it inside log messages to give
us meaningful data about what’s going on(otherwise we won’t be able to tell which message was
logged by which trader).
First let’s add the id into the [Link]’s fresh_trader_state as it will be defined per trader:
Chapter 9 - Run multiple traders in parallel 94

1 # /apps/naive/lib/naive/[Link]
2 defp fresh_trader_state(settings) do
3 %{
4 struct([Link], settings)
5 | id: :os.system_time(:millisecond), # <= add this line
6 budget: [Link]([Link], [Link]),
7 rebuy_notified: false
8 }
9 end

Now we can move on to the [Link] and add it to the %State{} struct as well as we will modify
every callback to include that id inside log messages:

1 # /apps/naive/lib/naive/[Link]
2 defmodule State do
3 @enforce_keys [
4 :id,
5 ...
6 ]
7 defstruct [
8 :id,
9 ...
10 ]
11 end
12
13 ...
14
15 def init(%State{id: id, symbol: symbol} = state) do
16 ...
17
18 [Link]("Initializing new trader(#{id}) for #{symbol}")
19
20 ...
21 end
22
23 def handle_info(
24 %TradeEvent{price: price},
25 %State{
26 id: id,
27 ...
28 } = state
29 ) do
30 ...
Chapter 9 - Run multiple traders in parallel 95

31
32 [Link](
33 "The trader(#{id}) is placing a BUY order " <>
34 "for #{symbol} @ #{price}, quantity: #{quantity}"
35 )
36
37 ...
38 end
39
40 def handle_info(
41 %TradeEvent{
42 buyer_order_id: order_id
43 },
44 %State{
45 id: id,
46 ...
47 } = state
48 ) do
49 ...
50 [Link](
51 "The trader(#{id}) is placing a SELL order for " <>
52 "#{symbol} @ #{sell_price}, quantity: #{quantity}."
53 )
54 ...
55 [Link]("Trader's(#{id} #{symbol} BUY order got partially filled")
56 ...
57 end
58
59 def handle_info(
60 %TradeEvent{
61 seller_order_id: order_id
62 },
63 %State{
64 id: id,
65 ...
66 } = state
67 ) do
68 ...
69 [Link]("Trader(#{id}) finished trade cycle for #{symbol}")
70 ...
71 [Link]("Trader's(#{id} #{symbol} SELL order got partially filled") .\
72 ..
73 end
Chapter 9 - Run multiple traders in parallel 96

74
75 def handle_info(
76 %TradeEvent{
77 price: current_price
78 },
79 %State{
80 id: id,
81 ...
82 } = state
83 ) do
84 ...
85 [Link]("Rebuy triggered for #{symbol} by the trader(#{id})")
86 ...
87 end

That finishes the implementation part - we should now be able to test the implementation and see
dynamically growing number of traders for our strategy based on price movement.

Test the implementation


Let’s start an iEx the session and open the :observer(inside go to “Applications” tab and click on
naive from the list of the left) so we will be able to see how the number of traders is growing as well
as PIDs are changing which means that they are finishing the full trades:

1 $ iex -S mix
2 ...
3 iex(1)> :[Link]()
4 ...
5 [Link].223 [info] Starting new supervision tree to trade on BTCUSDT
6 {:ok, #PID<0.338.0>}
7 [Link].055 [info] Initializing new trader(1610973417051) for BTCUSDT
8 iex(6)> Streamer.start_streaming("BTCUSDT")
9 {:ok, #PID<0.352.0>}
10 [Link].176 [info] The trader(1610973417051) is placing a BUY order for BTCUSDT @ \
11 37177.9, quantity: 5.37e-4
12 [Link].426 [info] The trader(1610973417051) is placing a SELL order for BTCUSDT @\
13 37207.59, quantity: 5.37e-4.
14 [Link].298 [info] Rebuy triggered for BTCUSDT by the trader(1610973417051)
15 [Link].299 [info] Starting new trader for BTCUSDT
16 [Link].299 [info] Initializing new trader(1610973431299) for BTCUSDT
17 [Link].305 [info] The trader(1610973431299) is placing a BUY order for BTCUSDT @ \
18 37136.28, quantity: 5.38e-4
Chapter 9 - Run multiple traders in parallel 97

19 [Link].351 [info] The trader(1610973431299) is placing a SELL order for BTCUSDT @\


20 37165.93, quantity: 5.38e-4.
21 [Link].005 [info] Trader(1610973431299) finished trade cycle for BTCUSDT
22 [Link].005 [info] BTCUSDT trader finished trade - restarting
23 [Link].006 [info] Initializing new trader(1610973451005) for BTCUSDT
24 [Link].107 [info] The trader(1610973451005) is placing a BUY order for BTCUSDT @ \
25 37164.14, quantity: 5.38e-4
26 [Link].152 [info] The trader(1610973451005) is placing a SELL order for BTCUSDT @\
27 37193.81, quantity: 5.38e-4.
28 [Link].047 [info] Trader(1610973451005) finished trade cycle for BTCUSDT
29 [Link].047 [info] BTCUSDT trader finished trade - restarting
30 [Link].047 [info] Initializing new trader(1610973471047) for BTCUSDT
31 [Link].103 [info] The trader(1610973471047) is placing a BUY order for BTCUSDT @ \
32 37185.93, quantity: 5.37e-4
33 [Link].643 [info] The trader(1610973471047) is placing a SELL order for BTCUSDT @\
34 37215.62, quantity: 5.37e-4.
35 [Link].543 [info] Rebuy triggered for BTCUSDT by the trader(1610973471047)
36 [Link].543 [info] Starting new trader for BTCUSDT
37 [Link].543 [info] Initializing new trader(1610973482543) for BTCUSDT
38 [Link].578 [info] The trader(1610973482543) is placing a BUY order for BTCUSDT @ \
39 37149.98, quantity: 5.38e-4
40 [Link].642 [info] The trader(1610973482543) is placing a SELL order for BTCUSDT @\
41 37179.64, quantity: 5.38e-4.
42 [Link].407 [info] Rebuy triggered for BTCUSDT by the trader(1610973482543)
43 [Link].407 [info] Starting new trader for BTCUSDT
44 [Link].407 [info] Initializing new trader(1610973522407) for BTCUSDT
45 [Link].418 [info] The trader(1610973522407) is placing a BUY order for BTCUSDT @ \
46 37108.89, quantity: 5.38e-4
47 [Link].590 [info] The trader(1610973522407) is placing a SELL order for BTCUSDT @\
48 37138.52, quantity: 5.38e-4.
49 [Link].662 [info] Rebuy triggered for BTCUSDT by the trader(1610973522407)
50 [Link].662 [info] Starting new trader for BTCUSDT
51 [Link].662 [info] Initializing new trader(1610973600662) for BTCUSDT
52 [Link].681 [info] The trader(1610973600662) is placing a BUY order for BTCUSDT @ \
53 37067.97, quantity: 5.39e-4
54 [Link].288 [info] The trader(1610973600662) is placing a SELL order for BTCUSDT @\
55 37097.57, quantity: 5.39e-4.
56 [Link].818 [info] Rebuy triggered for BTCUSDT by the trader(1610973600662)

And our observer shows the following:


Chapter 9 - Run multiple traders in parallel 98

Observer shows 5 parallel traders

We can clearly see that our strategy dynamically scaled from 1 to 5 parallel traders and they were
going through different trading steps without any problems - I think that’s really cool to see and it
wasn’t difficult to achieve in Elixir.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github³¹
³¹[Link]
Chapter 10 - Fine-tune trading
strategy per symbol
Objectives
• describe and design the required functionality
• add docker to project
• set up ecto inside the naive app
• create and migrate the DB
• seed symbols’ settings
• update the [Link] to fetch settings

Describe and design the required functionality


At this moment the settings of our naive strategy are hardcoded inside the [Link]:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 defp fetch_symbol_settings(symbol) do
4 symbol_filters = fetch_symbol_filters(symbol)
5
6 [Link](
7 %{
8 symbol: symbol, # <=
9 chunks: 5, # <=
10 budget: [Link]("100"), # <=
11 buy_down_interval: [Link]("0.0001"), # <= all of those settings
12 # -0.12% for quick testing # <=
13 profit_interval: [Link]("-0.0012"), # <=
14 rebuy_interval: [Link]("0.001") # <=
15 },
16 symbol_filters
17 )
18 end
19 ...
Chapter 10 - Fine-tune trading strategy per symbol 100

The problem about those is that they are hardcoded and there’s no flexibility to define them per
symbol at the moment.
In this chapter, we will move them out from this file into the Postgres database.

Add docker to project


The requirements for this section are docker and docker-compose installed in your system.
Inside the main directory of our project create a new file called [Link] and fill it with
below details:

1 # /[Link]
2 version: "3.2"
3 services:
4 db:
5 image: postgres:latest
6 restart: always
7 environment:
8 POSTGRES_PASSWORD: "postgres"
9 ports:
10 - 5432:5432
11 volumes:
12 - ../postgres-data:/var/lib/postgresql/data

If you are new to docker here’s the gist of what the above will do:

• it will start a single service called “db”


• “db” service will use the latest version of the postgres (image) inside the docker container
(latest version as tagged per [Link]
• we map TCP port 5432 in the container to port 5432 on the Docker host(format container_-
port:hosts_port)
• we set up environmental variable inside the docker container that will be used by the Postgres
app as a password for the default (postgres) user
• volumes option maps the directory from inside of the container to the host. This way we will
keep the state of the database between restarts.

We can now start the service using docker-compose:

1 $ docker-compose up -d
2 Creating hedgehog_db_1 ... done

To validate that it works we can run:


Chapter 10 - Fine-tune trading strategy per symbol 101

1 $ docker ps -a
2 CONTAINER ID IMAGE COMMAND CREATED STATUS \
3 PORTS NAMES
4 98558827b80b postgres:latest "docker-entrypoint.s…" 4 seconds ago Up 4 sec\
5 onds [Link]:5432->5432/tcp hedgehog_db_1

Set up ecto inside the naive app


Let’s start by adding database access to the naive application. The first step is to add the Ecto³²
module together with the Postgrex³³ ecto’s driver package to the deps function inside the [Link]
file. As we are going to use Enums inside Postgres, we need to add the EctoEnum³⁴ module as well:

1 # /apps/naive/[Link]
2 defp deps do
3 [
4 {:binance, "~> 0.7.1"},
5 {:binance_mock, in_umbrella: true},
6 {:decimal, "~> 2.0"},
7 {:ecto_sql, "~> 3.0"}, # <= New line
8 {:ecto_enum, "~> 1.4"}, # <= New line
9 {:phoenix_pubsub, "~> 2.0"},
10 {:postgrex, ">= 0.0.0"}, # <= New line
11 {:streamer, in_umbrella: true}
12 ]
13 end

Remember about installing those deps using:

1 $ mix [Link]

We can now use the ecto generator to add an the ecto repository to the Naive application:

³²[Link]
³³[Link]
³⁴[Link]
Chapter 10 - Fine-tune trading strategy per symbol 102

1 $ cd apps/naive
2 $ mix [Link] -r [Link]
3 * creating lib/naive
4 * creating lib/naive/[Link]
5 * updating ../../config/[Link]
6 Don't forget to add your new repo to your supervision tree
7 (typically in lib/naive/[Link]):
8
9 {[Link], []}
10
11 And to add it to the list of Ecto repositories in your
12 configuration files (so Ecto tasks work as expected):
13
14 config :naive,
15 ecto_repos: [[Link]]

Back to the IDE, the generator updated our config/[Link] file with the default access details
to the database, we need to modify them to point to our Postgres docker instance as well as add a
list of ecto repositories for our naive app (as per instruction above):

1 # /config/[Link]
2 config :naive, # <= added line
3 ecto_repos: [[Link]], # <= added line
4 binance_client: BinanceMock # <= merged from existing config
5
6 config :naive, [Link],
7 database: "naive", # <= updated
8 username: "postgres", # <= updated
9 password: "postgres", # <= updated
10 hostname: "localhost"

Here we can use localhost as inside the [Link] file we defined port forwarding
from the container to the host(Postgres is available at localhost:5432). We also merged the existing
binance_client setting together with the new ecto_repos setting.

The last step to be able to communicate with the database using Ecto will be to add the [Link]
module(created by generator) to the children list of the [Link]:
Chapter 10 - Fine-tune trading strategy per symbol 103

1 # /apps/naive/lib/naive/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {[Link], []}, # <= added line
6 {
7 DynamicSupervisor,
8 strategy: :one_for_one, name: [Link]
9 }
10 ]
11 ...

Create and migrate the DB


We can now create a new naive database using the mix tool, after that we will be able to generate a
migration file that will create the settings table:

1 $ mix [Link] -r [Link]


2 The database for [Link] has been created
3 $ cd apps/naive
4 $ mix [Link] create_settings
5 * creating priv/repo/migrations
6 * creating priv/repo/migrations/20210202223209_create_settings.exs

We can now copy the current hardcoded settings from the [Link] module and use them as
a column list of our new settings table. All of the below alterations needs to be done inside the
change function of our migration file:

1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 ...
3 def change do
4 create table(:settings) do
5 add(:symbol, :text, null: false)
6 add(:chunks, :integer, null: false)
7 add(:budget, :decimal, null: false)
8 add(:buy_down_interval, :decimal, null: false)
9 add(:profit_interval, :decimal, null: false)
10 add(:rebuy_interval, :decimal, null: false)
11 end
12 end
Chapter 10 - Fine-tune trading strategy per symbol 104

At this moment we just copied the settings and converted them to columns using the add function.
We need now to take care of the id column. We need to pass primary_key: false to the create
table macro to stop it from creating the default integer-based id column. Instead of that we will
define the id column ourselves with :uuid type and pass a flag that will indicate that it’s the primary
key of the settings table:

1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 ...
3 create table(:settings, primary_key: false) do
4 add(:id, :uuid, primary_key: true)
5 ...

We will also add create and update timestamps that come as a bundle when using the timestamps()
function inside the create table macro:

1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 ...
3 create table(...) do
4 ...
5
6 timestamps() # <= both create and update timestamps
7 end
8 ...

We will add a unique index on the symbol column to avoid any possible duplicates:

1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 ...
3 create table(...) do
4 ...
5 end
6
7 create(unique_index(:settings, [:symbol]))
8 end
9 ...

We will now add the status field which will be an Enum. It will be defined inside a separate file
and alias‘ed from our migration, this way we will be able to use it from within the migration and
the inside the lib code. First, we will apply changes to our migration and then we will move on to
creating the Enum module.
Here’s the full implementation of migration for reference:
Chapter 10 - Fine-tune trading strategy per symbol 105

1 # /apps/naive/priv/repo/migrations/20210202223209_create_settings.exs
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 def change do
8 TradingStatusEnum.create_type()
9
10 create table(:settings, primary_key: false) do
11 add(:id, :uuid, primary_key: true)
12 add(:symbol, :text, null: false)
13 add(:chunks, :integer, null: false)
14 add(:budget, :decimal, null: false)
15 add(:buy_down_interval, :decimal, null: false)
16 add(:profit_interval, :decimal, null: false)
17 add(:rebuy_interval, :decimal, null: false)
18 add(:status, [Link](), default: "off", null: false)
19
20 timestamps()
21 end
22
23 create(unique_index(:settings, [:symbol]))
24 end
25 end

That finishes our work on the migration file. We will now focus on TradingStatusEnum implemen-
tation. First, we need to create a schema directory inside the apps/naive/lib/naive directory and
file called trading_status_enum.ex and place below logic (defining the enum) in it:

1 # /apps/naive/lib/naive/schema/trading_status_enum.ex
2 import EctoEnum
3
4 defenum([Link], :trading_status, [:on, :off])

We used the defenum macro from the ecto_enum module to define our enum. It’s interesting to point
out that we didn’t need to define a new module as defenum macro takes care of that for us.
Let’s run the migration to create the table, unique index, and the enum:
Chapter 10 - Fine-tune trading strategy per symbol 106

1 $ mix [Link]
2 [Link].757 [info] == Running 20210202223209 [Link].\
3 change/0 forward
4 [Link].759 [info] execute "CREATE TYPE public.trading_status AS ENUM ('on', 'off'\
5 )"
6 [Link].760 [info] create table settings
7 [Link].820 [info] create index settings_symbol_index
8 [Link].829 [info] == Migrated 20210202223209 in 0.0s

We can now create a schema file for the settings table so inside the /apps/naive/lib/naive/schema
create a file called [Link]. We will start with a skeleton implementation of schema file together
with the copied list of columns from the migration and convert to ecto’s types using it’s docs³⁵:

1 # /apps/naive/lib/naive/schema/[Link]
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 @primary_key {:id, :binary_id, autogenerate: true}
8
9 schema "settings" do
10 field(:symbol, :string)
11 field(:chunks, :integer)
12 field(:budget, :decimal)
13 field(:buy_down_interval, :decimal)
14 field(:profit_interval, :decimal)
15 field(:rebuy_interval, :decimal)
16 field(:status, TradingStatusEnum)
17
18 timestamps()
19 end
20 end

Seed symbols’ settings


So we have all the pieces of implementation to be able to create DB, migrate the settings table, and
query it using Ecto. To be able to drop the hardcoded settings from the [Link] we will need
to fill our database with the “default” setting for each symbol. To achieve that we will define default
settings inside the config/[Link] file and we will create a seed script that will fetch all pairs
from Binance and insert a new config row inside DB for each one.
³⁵[Link]
Chapter 10 - Fine-tune trading strategy per symbol 107

Let’s start by adding those default values to the config file(we will merge them into the structure
defining binance_client and ecto_repos):

1 # config/[Link]
2 config :naive,
3 ecto_repos: [[Link]],
4 binance_client: BinanceMock,
5 trading: %{
6 defaults: %{
7 chunks: 5,
8 budget: 1000.0,
9 buy_down_interval: 0.0001,
10 profit_interval: -0.0012,
11 rebuy_interval: 0.001
12 }
13 }

Moving on to the seeding script, we need to create a new file called seed_settings.exs inside the
/apps/naive/lib/naive/priv/ directory. Let’s start by aliasing the required modules and requiring
the Logger:

1 # /apps/naive/priv/seed_settings.exs
2 require Logger
3
4 alias Decimal
5 alias [Link]
6 alias [Link]

Next, we will get the Binance client from the config:

1 # /apps/naive/priv/seed_settings.exs
2 ...
3 binance_client = Application.get_env(:naive, :binance_client)

Now, it’s time to fetch all the symbols(pairs) that Binance supports:
Chapter 10 - Fine-tune trading strategy per symbol 108

1 # /apps/naive/priv/seed_settings.exs
2 ...
3 [Link]("Fetching exchange info from Binance to create trading settings")
4
5 {:ok, %{symbols: symbols}} = binance_client.get_exchange_info()

Now we need to fetch default trading settings from the config file as well as the current timestamp:

1 # /apps/naive/priv/seed_settings.exs
2 ...
3 %{
4 chunks: chunks,
5 budget: budget,
6 buy_down_interval: buy_down_interval,
7 profit_interval: profit_interval,
8 rebuy_interval: rebuy_interval
9 } = Application.get_env(:naive, :trading).defaults
10
11 timestamp = NaiveDateTime.utc_now()
12 |> [Link](:second)

We will use the default settings for all rows to be able to insert data into the database. Normally we
wouldn’t need to set inserted_at and updated_at fields as Ecto would generate those values for us
when using [Link]/2 but we won’t be able to use this functionality as it takes a single record
at the time. We will be using Repo.insert_all/3 which is a bit more low-level function without
those nice features like filling timestamps(sadly). Just to be crystal clear - [Link]/2 takes at
least a couple of seconds(on my machine) for 1000+ symbols currently supported by Binance, on the
other hand Repo.insert_all/3, will insert all of them in a couple of hundred milliseconds.
As our structs will differ by only the symbol column we can first create a full struct that will serve
as a template:

1 # /apps/naive/priv/seed_settings.exs
2 ...
3 base_settings = %{
4 symbol: "",
5 chunks: chunks,
6 budget: Decimal.from_float(budget),
7 buy_down_interval: Decimal.from_float(buy_down_interval),
8 profit_interval: Decimal.from_float(profit_interval),
9 rebuy_interval: Decimal.from_float(rebuy_interval),
10 status: "off",
11 inserted_at: timestamp,
Chapter 10 - Fine-tune trading strategy per symbol 109

12 updated_at: timestamp
13 }

We will now map each of the retrieved symbols and inject them to the base_settings structs and
pushing all of those to the Repo.insert_all/3 function:

1 # /apps/naive/priv/seed_settings.exs
2 ...
3 [Link]("Inserting default settings for symbols")
4
5 maps = symbols
6 |> [Link](&(%{base_settings | symbol: &1["symbol"]}))
7
8 {count, nil} = Repo.insert_all(Settings, maps)
9
10 [Link]("Inserted settings for #{count} symbols")

Update the [Link] to fetch settings


The final step will be to update the [Link] to fetch the settings from the database. At the top
of the module add the following:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 alias [Link]
4 alias [Link]
5 ...

Now we need to modify the fetch_symbol_settings/1 to fetch settings from DB instead of the
hardcoded map. We will use Repo.get_by!/3 as we are unable to trade without settings. The
second trick used here is Map.from_struct/1 that is required here as otherwise result would become
the [Link] struct(this would cause problems further down the line as we are
iterating on the returned map and would get the protocol Enumerable not implemented for
%[Link] error):
Chapter 10 - Fine-tune trading strategy per symbol 110

1 # /apps/naive/lib/naive/[Link]
2 ...
3 defp fetch_symbol_settings(symbol) do
4 symbol_filters = fetch_symbol_filters(symbol)
5 settings = Repo.get_by!(Settings, symbol: symbol)
6
7 [Link](
8 symbol_filters,
9 settings |> Map.from_struct()
10 )
11 end
12 ...

We can now run the seeding script to fill our database with the default settings:

1 $ cd apps/naive
2 $ mix run priv/seed_settings.exs
3 [Link].341 [info] Fetching exchange info from Binance to create trading settings
4 [Link].571 [info] Inserting default settings for symbols
5 [Link].645 [info] Inserted settings for 1276 symbols

We can verify that records were indeed inserted into the database by connecting to it using the psql
application:

1 $ psql -Upostgres -hlocalhost


2 Password for user postgres: # <= use 'postgres' password here
3 ...
4 postgres=# \c naive
5 You are now connected to database "naive" as user "postgres".
6 naive=# \x
7 Expanded display is on.
8 naive=# SELECT * FROM settings;
9 -[ RECORD 1 ]-----+-------------------------------------
10 id | 159c8f32-d571-47b2-b9d7-38bb42868043
11 symbol | ETHBTC
12 chunks | 5
13 budget | 1000
14 buy_down_interval | 0.0001
15 profit_interval | -0.0012
16 rebuy_interval | 0.001
17 status | off
18 inserted_at | 2021-02-02 [Link]
Chapter 10 - Fine-tune trading strategy per symbol 111

19 updated_at | 2021-02-02 [Link]


20
21 # press arrows to scroll, otherwise press `q`
22
23 naive=# SELECT COUNT(*) FROM settings;
24 -[ RECORD 1 ]
25 count | 1276
26
27 naive=# \q # <= to close the `psql`

That confirms that there are 1276 settings inside the database that will allow us to continue trading
which we can check by running our app inside iex(from the main project’s directory):

1 $ iex -S mix
2 ...
3 iex(1)> Naive.start_trading("NEOUSDT")
4 [Link].936 [info] Starting new supervision tree to trade on NEOUSDT
5 {:ok, #PID<0.378.0>}
6 [Link].584 [info] Initializing new trader(1612293637000) for NEOUSDT

The above log messages confirm that the [Link] was able to fetch settings from the database
that were later put into the [Link]’s state and passed to it.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github³⁶
³⁶[Link]
Chapter 11 - Supervise and autostart
streaming
Objectives
• describe and design the required functionality
• register the [Link] processes with names
• set up ecto inside the streamer app
• create and migrate the db
• seed default settings
• implement the supervision tree and start streaming functionality
• implement the stop functionality
• implement the autostart streaming functionality
• test the implementation

Describe and design the required functionality


At this moment there’s no supervision around the streamer processes, whenever an error would
occur inside the [Link] process, it will die and never come back up.
That’s less than perfect, but we can use supervisors to the rescue.
We will create a new [Link] module inside our streamer application
that will supervise the [Link] processes.
Next, we will consider a list of functionalities that we would like it to provide:

• start streaming. This will require a new [Link] process started under the

[Link]. We will put logic responsible for starting that process inside
the [Link] module.

• stop streaming. To be able to stop the [Link] process streaming on a specific symbol,
we will need to know that process’ PID based only on symbol string(ie. “ETHUSDT”). To make
that possible, we will need to register every [Link] process with a name that we
will be able to “reverse-engineer” based only on symbol string for example: :"#{__MODULE__-
}-#{symbol}"
Chapter 11 - Supervise and autostart streaming 113

• autostart streaming. At the start of streaming on a symbol, we should persist that action as a
symbol’s streaming setting inside the database. We will need to generate a new [Link], con-
figure, create and migrate DB (just like in the last chapter for the naive app) to be able to retrieve
that list. We will write a logic that will fetch settings of the symbols, autostart the ones that are
enabled and place all that logic inside the [Link] module. We
will introduce a Task³⁷ child process that will utilize the logic from the [Link]
to fetch those enabled symbols and start [Link] processes on startup - we will
describe all of this separately in its section in this chapter.

Register the [Link] processes with names


To be able to perform start/stop streaming on a symbol we will first need to be able to figure out the
PID of the [Link] process that is streaming that symbol.
The first change that we need to apply will be to register [Link] processes with names
by passing the 4th argument to the WebSockex.start_link/4 function:

1 # /apps/streamer/lib/streamer/[Link]
2 def start_link(symbol) do
3 lowercased_symbol = [Link](symbol) # <= separate variable
4
5 WebSockex.start_link(
6 "#{@stream_endpoint}#{lowercased_symbol}@trade", # <= lowercase symbol
7 __MODULE__,
8 nil,
9 name: :"#{__MODULE__}-#{symbol}" # <= uppercase symbol
10 )
11 end

Few things worth mention here:

• we are getting the uppercase symbol but inside the URL we need to use a lowercase symbol so
we will introduce a new separate variable to be used in the URL
• we are registering the process using the uppercase symbol so the name will remain consistent
with the naive application’s processes
• to register process we are sending keyword list as the 4th argument to custom start_link/4
function of WebSockex module (link to source³⁸ - again, no need to be afraid of reading the
source code in Elixir, that’s the beauty of it)
³⁷[Link]
³⁸[Link]
Chapter 11 - Supervise and autostart streaming 114

Set up ecto inside the streamer app


In the same fashion as in the last chapter, we will need to set up the database inside the streamer
app. We will use the same Postgres server(docker container) that we’ve set up inside docker in the
last chapter, just a separate database, so there’s no need to update the [Link] file.
As previously the first step will be to add the ecto modules and Postgres related packages into
deps inside the [Link] file of the streamer app. Additionally, we will add the binance module that
we will use to fetch all symbols supported by the exchange(to generate default settings as we’ve
done for the naive application. We are unable to use the BinanceMock as this will cause the circular
dependency [Binance Mock depends on the streamer app]):

1 # /apps/streamer/[Link]
2 ...
3 defp deps do
4 [
5 {:binance, "~> 0.7.1"}, # <= used to retrieve symbols list(exchangeInfo)
6 {:ecto_sql, "~> 3.0"}, # <= added dependency
7 {:ecto_enum, "~> 1.4"}, # <= added dependency
8 {:jason, "~> 1.2"},
9 {:phoenix_pubsub, "~> 2.0"},
10 {:postgrex, ">= 0.0.0"}, # <= added dependency
11 {:websockex, "~> 0.4.2"}
12 ]
13 end

Run mix [Link] to install new dependencies.


We can now use ecto generator to add an ecto repository to the Streamer application:

1 $ cd apps/streamer
2 $ mix [Link] -r [Link]
3 * creating lib/streamer
4 * creating lib/streamer/[Link]
5 * updating ../../config/[Link]
6 ...

Update the config to match access details to Postgres’ docker instance:


Chapter 11 - Supervise and autostart streaming 115

1 # /config/[Link]
2 config :streamer, # <= added line
3 ecto_repos: [[Link]] # <= added line
4
5 config :streamer, [Link],
6 database: "streamer", # <= database updated
7 username: "postgres", # <= username updated
8 password: "postgres", # <= password updated
9 hostname: "localhost"

The last step will be to update the children list of the [Link] module:

1 # /apps/streamer/lib/streamer/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {[Link], []}, # <= repo added
6 {
7 [Link],
8 name: [Link], adapter_name: [Link].PG2
9 }
10 ]
11 ...

Create and migrate the db


We can now create a new streamer database using the mix tool, after that we will be able to generate
a migration file that will create the settings table:

1 $ mix [Link] -r [Link]


2 The database for [Link] has been created
3 $ cd apps/streamer
4 $ mix [Link] create_settings
5 * creating priv/repo/migrations
6 * creating priv/repo/migrations/20210203184805_create_settings.exs

We can safely start just with id, symbol and status columns, where the last one will follow the same
enum idea from the previous chapter:
Chapter 11 - Supervise and autostart streaming 116

1 # /apps/streamer/priv/repo/migrations/20210203184805_create_settings.exs
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 def change do
8 StreamingStatusEnum.create_type()
9
10 create table(:settings, primary_key: false) do
11 add(:id, :uuid, primary_key: true)
12 add(:symbol, :text, null: false)
13 add(:status, [Link](), default: "off", null: false)
14
15 timestamps()
16 end
17
18 create(unique_index(:settings, [:symbol]))
19 end
20 end

That finishes our work on the migration file, we need to add the StreamingStatusEnum in the
same way as in the last chapter (create a schema directory inside the apps/streamer/lib/streamer
directory and anew file called streaming_status_enum.ex and place below logic (defining the enum)
in it:

1 # /apps/streamer/lib/streamer/schema/streaming_status_enum.ex
2 import EctoEnum
3
4 defenum([Link], :streaming_status, [:on, :off])

Let’s run the migration to create the table, unique index, and the enum:

1 $ mix [Link]
2 [Link].850 [info] == Running 20210203184805 [Link]\
3 [Link]/0 forward
4 [Link].850 [info] execute "CREATE TYPE public.streaming_status AS ENUM ('on', 'of\
5 f')"
6 [Link].851 [info] create table settings
7 [Link].912 [info] create index settings_symbol_index
8 [Link].932 [info] == Migrated 20210203184805 in 0.0s

We can now create a schema file for the settings table. Inside the /apps/streamer/lib/streamer/schema
directory create a file called [Link]:
Chapter 11 - Supervise and autostart streaming 117

1 # /apps/streamer/lib/streamer/schema/[Link]
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 @primary_key {:id, :binary_id, autogenerate: true}
8
9 schema "settings" do
10 field(:symbol, :string)
11 field(:status, StreamingStatusEnum)
12
13 timestamps()
14 end
15 end

We are now ready to query the table but first, we need to insert the default settings into the database.

Seed default settings


As with the settings inside the naive application, we will fetch all symbols from binance and bulk
insert them into the database.
First let’s create a new file called seed_settings.exs inside the apps/streamer/priv directory. As
this file is nearly the same as the one from the last chapter I will skip the lengthy explanation - this
is the script:

1 # /apps/streamer/priv/seed_settings.exs
2 require Logger
3
4 alias Decimal
5 alias [Link]
6 alias [Link]
7
8 [Link]("Fetching exchange info from Binance to create streaming settings")
9
10 {:ok, %{symbols: symbols}} = Binance.get_exchange_info()
11
12 timestamp = NaiveDateTime.utc_now()
13 |> [Link](:second)
14
15 base_settings = %{
Chapter 11 - Supervise and autostart streaming 118

16 symbol: "",
17 status: "off",
18 inserted_at: timestamp,
19 updated_at: timestamp
20 }
21
22 [Link]("Inserting default settings for symbols")
23
24 maps = symbols
25 |> [Link](&(%{base_settings | symbol: &1["symbol"]}))
26
27 {count, nil} = Repo.insert_all(Settings, maps)
28
29 [Link]("Inserted settings for #{count} symbols")

Don’t forget to run the seeding script before progressing forward:

1 $ cd apps/streamer
2 $ mix run priv/seed_settings.exs
3 [Link].675 [info] Fetching exchange info from Binance to create streaming settings
4 [Link].386 [info] Inserting default settings for symbols
5 [Link].448 [info] Inserted settings for 1277 symbols

Implement the supervision tree and start streaming


functionality
Let’s start by creating a new file called dynamic_streamer_supervisor.ex inside the /apps/streamer/lib/streamer
directory. Let’s start with a default implementation from the docs³⁹ (updated with correct module
and process names):

1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 defmodule [Link] do
3 use DynamicSupervisor
4
5 def start_link(init_arg) do
6 DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
7 end
8
9 def init(_init_arg) do
10 [Link](strategy: :one_for_one)
³⁹[Link]
Chapter 11 - Supervise and autostart streaming 119

11 end
12 end

Next, we will add the start_streaming/1 function at the bottom of the [Link]
module:

1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 ...
3 def start_streaming(symbol) when is_binary(symbol) do
4 symbol = [Link](symbol)
5
6 case get_pid(symbol) do
7 nil ->
8 [Link]("Starting streaming on #{symbol}")
9 {:ok, _settings} = update_streaming_status(symbol, "on")
10 {:ok, _pid} = start_streamer(symbol)
11
12 pid ->
13 [Link]("Streaming on #{symbol} already started")
14 {:ok, _settings} = update_streaming_status(symbol, "on")
15 {:ok, pid}
16 end
17 end

To unpack above - we are checking is there a streamer process already running for the passed symbol
and based on the result of that check, we either start the new streaming process(and update the
symbol’s settings) or just update the symbol’s settings.
Inside the start_streaming/1 function, we are using 3 helper functions that we need to add at the
bottom of the file:

1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 defp get_pid(symbol) do
3 [Link](:"[Link]-#{symbol}")
4 end
5
6 defp update_streaming_status(symbol, status)
7 when is_binary(symbol) and is_binary(status) do
8 Repo.get_by(Settings, symbol: symbol)
9 |> [Link](%{status: status})
10 |> [Link]()
11 end
12
Chapter 11 - Supervise and autostart streaming 120

13 defp start_streamer(symbol) do
14 DynamicSupervisor.start_child(
15 [Link],
16 {[Link], symbol}
17 )
18 end

The above functions are quite self-explanatory, get_pid/1 is a convenience wrapper, update_-
streaming_status/2 will update the status field for the passed symbol, start_streamer/1 will
instruct the [Link] to start a new [Link] process with
symbol passed as the first argument.
The last step to get the above function to work(and future ones in this module) would be to add an
require, an import and a few aliases at the top of the module:

1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 require Logger
3
4 import [Link], only: [from: 2]
5
6 alias [Link]
7 alias [Link]

As we added a new start_streaming/1 logic inside the [Link], we


need to replace the start_streaming/1 implementation inside the Streamer module:

1 # /apps/streamer/lib/[Link]
2 ...
3 alias [Link]
4
5 defdelegate start_streaming(symbol), to: DynamicStreamerSupervisor

As we don’t need to put any logic inside the Streamer.start_streaming/1 function, we can just
delegate the call straight to the [Link] module.
The last step will be to append the [Link] to the children list of the
[Link]:
Chapter 11 - Supervise and autostart streaming 121

1 # /apps/streamer/lib/streamer/[Link]
2 def start(_type, _args) do
3 children = [
4 ...
5 {[Link], []}
6 ]

At this moment our supervision tree already works and all streamer processes are being monitored
by the [Link]:

DynamicStreamerSupervisor with two named streamers

Implement the stop functionality


As we can see, we are now registering the [Link] processes with names that contain
their symbols. We will be able to retrieve PIDs of those registered processes just by simply passing
the symbol string(ie. “ETHUSDT”) into get_pid/1, which will allow us to then request termination
of those processes by the [Link].
Let’s write a stop_streaming/1 logic inside the [Link] module(put
it above first private function):

1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 def stop_streaming(symbol) when is_binary(symbol) do
3 symbol = [Link](symbol)
4
5 case get_pid(symbol) do
6 nil ->
7 [Link]("Streaming on #{symbol} already stopped")
8 {:ok, _settings} = update_streaming_status(symbol, "off")
9
10 pid ->
11 [Link]("Stopping streaming on #{symbol}")
12
13 :ok =
Chapter 11 - Supervise and autostart streaming 122

14 DynamicSupervisor.terminate_child(
15 [Link],
16 pid
17 )
18
19 {:ok, _settings} = update_streaming_status(symbol, "off")
20 end
21 end

stop_streaming/1 looks very similar to start_streaming/1, we are checking is there already a


[Link] process registered for that symbol, and we either ask the [Link]
to terminate it for us (using the DynamicSupervisor.terminate_child/2 function + update the
status) or just update the status to be off.
We need to update the Streamer module to provide the interface to stop streaming on a symbol:

1 # /apps/streamer/lib/[Link]
2 ...
3 defdelegate stop_streaming(symbol), to: DynamicStreamerSupervisor
4 ...

Implement the autostart streaming functionality


Currently, whenever we will shutdown the elixir app, settings persist in the database but streamers
are not started on the next init.
To fix this, we will add autostart_streaming/0 inside the [Link].
Note: It very important to differentiate between module and process. We will put our autostarting
logic inside the module but the [Link] process won’t run it.
We will introduce a new Task⁴⁰ process that will execute all the autostarting logic.
That will cover the problem of the Supervisor executing too much business logic (as the Task will
execute it), but how will we supervise them together?
At init both will start, the [Link] first and then Task will ask it to
start multiple children and that will work fine. The problem occurs when the [Link]
would die because of any reason. Currently, it’s supervised using the one_for_one strategy(and the
Task would be as well) which means that it will get started again by the [Link]
process but at that moment the “autostarting” Task won’t be there anymore to start streaming on
all enabled symbols.
We can clearly see that whenever the [Link] will die it needs to be
started again together with the “autostart” Task and this won’t fit our current [Link]’s
strategy.
⁴⁰[Link]
Chapter 11 - Supervise and autostart streaming 123

In cases like those, a new level of supervision needs to be introduced that will have a different
supervision strategy for those “coupled” processes. We will rename the process name of the
[Link] (which is currently registered as [Link]) to [Link].
Then we will introduce the new [Link] module and register it under the same name.
We will attach both [Link] and Task to the [Link]
and assign it with the rest_for_one strategy which will restart the Task whenever [Link]
would die:

Updated supervision tree with additional superviser and renamed Application process

Let’s start by creating the autostart_streaming/0 functionality inside the [Link]:

1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2
3 # add below after the `init/1` function
4 def autostart_streaming() do
5 fetch_symbols_to_stream()
6 |> [Link](&start_streaming/1)
7 end
8
9 # and this at the end of the module
10 defp fetch_symbols_to_stream() do
11 [Link](
12 from(s in Settings,
13 where: [Link] == "on",
14 select: [Link]
15 )
16 )
17 end
Chapter 11 - Supervise and autostart streaming 124

autostart_streaming/0 function fetches all symbols from the settings table with status == "on"
then it passes them one by one into the start_streaming/1 function using [Link]/2.
We can now focus on referring to the above autostarting logic inside the new supervisor that we will
create now. Let’s start by creating a new file called [Link] inside the /apps/streamer/lib/streamer/
directory and fill it with default Supervisor⁴¹ implementation:

1 # /apps/streamer/lib/streamer/[Link]
2 defmodule [Link] do # <= updated module name
3 use Supervisor
4
5 def start_link(init_arg) do
6 Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
7 end
8
9 def init(_init_arg) do
10 children = [
11
12 ]
13
14 [Link](children, strategy: :one_for_one)
15 end
16 end

We can now update the strategy to rest_for_one:

1 # /apps/streamer/lib/streamer/[Link]
2 def init(_init_arg) do
3 ...
4 [Link](children, strategy: :rest_for_one) # <= strategy updated
5 end

The last step inside our new supervisor will be to add 2 children: [Link]
and Task (that will autostart the symbol streamers):

⁴¹[Link]
Chapter 11 - Supervise and autostart streaming 125

1 # /apps/streamer/lib/streamer/[Link]
2 def init(_init_arg) do
3 children = [
4 {[Link], []},
5 {Task,
6 fn ->
7 [Link].autostart_streaming()
8 end}
9 ]
10 ...
11 end

Final update in this chapter will be to replace the [Link] as one of


the children inside the [Link] module and update the name that application process
registers under:

1 # /apps/streamer/lib/streamer/[Link]
2 ...
3 children = [
4 {[Link], []},
5 {
6 [Link],
7 name: [Link], adapter_name: [Link].PG2
8 },
9 {[Link], []} # <= updated supervisor
10 ]
11
12 opts = [strategy: :one_for_one, name: [Link]] # <= updated name
13 ...

Test the implementation


Let’s start an iex session and call the start_streaming/1 function twice for two different symbols
and then exit using double Ctrl+c:
Chapter 11 - Supervise and autostart streaming 126

1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("ethusdt")
4 [Link].809 [info] Starting streaming on ETHUSDT
5 {:ok, #PID<0.370.0>}
6 iex(2)> Streamer.start_streaming("neousdt")
7 [Link].288 [info] Starting streaming on NEOUSDT
8 {:ok, #PID<0.377.0>}

Now, open a new iex session and look at the output:

1 $ iex -S mix
2 ...
3 iex(1)>
4 [Link].920 [info] Starting streaming on ETHUSDT
5 [Link].306 [info] Starting streaming on NEOUSDT

We can also confirm that streamer processes are there by using :[Link]():

Updated supervision tree with two autostarted streamers and new supervisor

Inside the same iex session run the following:

1 iex(5)> Streamer.stop_streaming("neousdt")
2 [Link].205 [info] Stopping streaming on NEOUSDT
3 {:ok,
4 %[Link]{
5 ...
6 }}
7 iex(6)> Streamer.stop_streaming("ethusdt")
8 [Link].553 [info] Stopping streaming on ETHUSDT
9 {:ok,
10 %[Link]{
11 ...
12 }}
Chapter 11 - Supervise and autostart streaming 127

Stop the iex session and start new one - streamers shouldn’t be autostarted any more.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁴²
⁴²[Link]
Chapter 12 - Start, stop, shutdown
and autostart trading
Objectives
• describe and design the required functionality
• (re-)implement the start trading functionality
• implement the stop trading functionality
• implement the autostart trading functionality
• implement the shutdown trading functionality
• test the implementation

Describe and design the required functionality


In the 10th chapter, we’ve introduced the Postgres database inside the naive application together
with the settings per symbol.
In this chapter, we will progress forward to provide additional trading functionality that will be
similar to the functionality implemented in the last chapter for the streaming application:

• stop trading - as the [Link] processes are registered with names that can
be easily reverse engineered, we should be able to utilize the Process.where_is/1 function
to retrieve the PIDs and instruct the [Link] to terminate those
child processes. Again, we need to put that logic somewhere so we will implement the
[Link] as a full module using the DynamicSupervisor behavior.
• start_trading - as our [Link] will now be a module we will be able
to remove the start_trading/1 implementation from the Naive module and reimplement it
inside the [Link] module. It will follow the same pattern of checking
for PID, starting the [Link] process and flipping the status flag inside the
settings table’s row for that symbol.
• shutdown trading - sometimes abruptly stopping trading won’t be the best solution, it would
be better to allow the [Link] processes to finish their ongoing trades. To be able to do
that we will need to inform the [Link] process assigned to the symbol that the settings
for that symbol have been updated and that should cause the [Link] process to withhold
starting new [Link] processes and terminate the whole tree when the last trader will
finish.
Chapter 12 - Start, stop, shutdown and autostart trading 129

• autostart trading - this will be a very similar implementation to the one from the last chapter. It
will require introducing a new supervisor(we will follow the same naming convention: rename
[Link]’s registered process name to [Link], create a new supervisor
called [Link]) and utilize the Task process to execute the autostarting logic.

Final supervision tree after adding the autostarting Task and the [Link]
Chapter 12 - Start, stop, shutdown and autostart trading 130

(Re-)Implement the start trading functionality


To (re-)implement the start_trading/1 functionality we will need to create a new file called
dynamic_symbol_supervisor.ex inside the /apps/naive/lib/naive directory that will use the
DynamicSupervisor behaviour. Previously we have been using default DynamicSupervisor imple-
mentation(one of the children of the [Link] - to be substituted with the below module):

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 defmodule [Link] do # <= module updated
3 use DynamicSupervisor
4
5 require Logger # <= Logger added
6
7 import [Link], only: [from: 2] # <= added for querying
8
9 alias [Link] # <= added for querying/updating
10 alias [Link] # <= added for querying/updating
11
12 def start_link(init_arg) do
13 DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
14 end
15
16 def init(_init_arg) do
17 [Link](strategy: :one_for_one)
18 end
19 end

The above code is a default implementation from the DynamicSupervisor docs⁴³ with some
additional imports, require and aliases as we will use them in this chapter.
Our start_trading/1 implementation is almost the same as one for the streamer application from
the last chapter:

⁴³[Link]
Chapter 12 - Start, stop, shutdown and autostart trading 131

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def start_trading(symbol) when is_binary(symbol) do
4 symbol = [Link](symbol)
5
6 case get_pid(symbol) do
7 nil ->
8 [Link]("Starting trading of #{symbol}")
9 {:ok, _settings} = update_trading_status(symbol, "on")
10 {:ok, _pid} = start_symbol_supervisor(symbol)
11
12 pid ->
13 [Link]("Trading on #{symbol} already started")
14 {:ok, _settings} = update_trading_status(symbol, "on")
15 {:ok, pid}
16 end
17 end
18 ...

together with additional helper functions:

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 defp get_pid(symbol) do
3 [Link](:"[Link]-#{symbol}")
4 end
5
6 defp update_trading_status(symbol, status)
7 when is_binary(symbol) and is_binary(status) do
8 Repo.get_by(Settings, symbol: symbol)
9 |> [Link](%{status: status})
10 |> [Link]()
11 end
12
13 defp start_symbol_supervisor(symbol) do
14 DynamicSupervisor.start_child(
15 [Link],
16 {[Link], symbol}
17 )
18 end

Both implementation and helper functions are almost the same as the ones inside the naive
application. It could be tempting to abstract some of the logic away but remember that we should
treat all applications in our umbrella project as standalone services that should not share any code
Chapter 12 - Start, stop, shutdown and autostart trading 132

if possible(we broke that rule for the TradeEvent struct from the streamer app but we could easily
just make a lib with that struct that would be shared between two applications). I would shy away
from sharing any business logic between applications in the umbrella project.
There are two additional places where we need to make updates to get our start_trading/1 to work
again:

• we need to update the children list inside the [Link]:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 children = [
4 {[Link], []},
5 {[Link], []} # <= replacement of DynamicSupervisor
6 ]

• we need to replace the start_trading/1 implementation inside the Naive module to defdelegate
macro(as we don’t have any logic to run there):

1 # /apps/naive/lib/[Link]
2 ...
3 alias [Link]
4
5 defdelegate start_trading(symbol), to: DynamicSymbolSupervisor
6 ...

At this moment we are again able to use the Naive.start_trading/1 function to start trading on a
symbol (behind the scenes it will use logic from the new [Link] module).

Implement the stop trading functionality


Stop trading will require a change in two places, first inside the [Link]
where we will place the termination logic:
Chapter 12 - Start, stop, shutdown and autostart trading 133

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def stop_trading(symbol) when is_binary(symbol) do
4 symbol = [Link](symbol)
5
6 case get_pid(symbol) do
7 nil ->
8 [Link]("Trading on #{symbol} already stopped")
9 {:ok, _settings} = update_trading_status(symbol, "off")
10
11 pid ->
12 [Link]("Stopping trading of #{symbol}")
13
14 :ok =
15 DynamicSupervisor.terminate_child(
16 [Link],
17 pid
18 )
19
20 {:ok, _settings} = update_trading_status(symbol, "off")
21 end
22 end
23 ...

The second change we need to make is to create a forwarding interface using defdelegate inside
the Naive module:

1 # /apps/naive/lib/[Link]
2 ...
3 defdelegate stop_trading(symbol), to: DynamicSymbolSupervisor
4 ...

That pretty much finishes the stop_trading/1 functionality. We are now able to start and stop(what
was previously not available) trading on a symbol.

Implement the autostart trading functionality


To implement the autostarting we will need to (in a similar fashion as in the last chapter) add a new
supervision level that will be dedicated to supervising the [Link] and the
“autostarting” Task.
Let’s create a new file called [Link] inside the /apps/naive/lib/naive directory and (as in
the last chapter) we will add the [Link] and the Task to its children list.
We will also update the supervision strategy to :rest_for_one:
Chapter 12 - Start, stop, shutdown and autostart trading 134

1 # /apps/naive/lib/naive/[Link]
2 defmodule [Link] do
3 use Supervisor
4
5 def start_link(init_arg) do
6 Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
7 end
8
9 def init(_init_arg) do
10 children = [
11 {[Link], []}, # <= added
12 {Task, # <= added
13 fn -> # <= added
14 [Link].autostart_trading() # <= added
15 end} # <= added
16 ]
17
18 [Link](children, strategy: :rest_for_one) # <= strategy updated
19 end
20 end

Now we need to swap the [Link] to [Link] in the children


list of the [Link], as well as update the registered process’ name of the [Link]:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {[Link], []},
6 {[Link], []} # <= replacement for DynamicSymbolSupervisor
7 ]
8
9 opts = [strategy: :one_for_one, name: [Link]] # <= name updated

Finally, we need to implement autostart_trading/0 inside the [Link]


module as our new Task refers to it:
Chapter 12 - Start, stop, shutdown and autostart trading 135

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 # add the below function after the `init/1` function
4 def autostart_trading() do
5 fetch_symbols_to_trade()
6 |> [Link](&start_trading/1)
7 end
8
9 ...
10
11 # and this helper at the end of the module
12 defp fetch_symbols_to_trade() do
13 [Link](
14 from(s in Settings,
15 where: [Link] == "on",
16 select: [Link]
17 )
18 )
19 end
20 ...

Those are the same (excluding updated function names) as inside the streamer application. We are
fetching enabled symbols and starting new [Link] processes for each one.
At this moment we can already see our implementation in action:

Trading on two symbols autostarted, new [Link] and renamed [Link]

At this moment we are able to test the current implementation inside the iex:

1 $ iex -S mix
2 ...
3 iex(1)> Naive.start_trading("ethusdt")
4 [Link].207 [info] Starting trading of ETHUSDT
5 [Link].261 [info] Starting new supervision tree to trade on ETHUSDT
6 {:ok, #PID<0.372.0>}
7 [Link].344 [info] Initializing new trader(1612647333342) for ETHUSDT
8 iex(3)> Naive.start_trading("neousdt")
9 [Link].128 [info] Starting trading of NEOUSDT
Chapter 12 - Start, stop, shutdown and autostart trading 136

10 [Link].169 [info] Starting new supervision tree to trade on NEOUSDT


11 {:ok, #PID<0.383.0>}
12 [Link].007 [info] Initializing new trader(1612647356007) for NEOUSDT
13 [Link].699 [info] Stopping trading of NEOUSDT
14 {:ok,
15 %[Link]{
16 ...
17 }}

We can now exit the iex and start a new one:

1 $ iex -S mix
2 ...
3 [Link].894 [info] Starting trading of ETHUSDT
4 [Link].938 [info] Starting new supervision tree to trade on ETHUSDT
5 [Link].786 [info] Initializing new trader(1612647558784) for ETHUSDT
6 iex(1)>

The above logs confirm that the naive application autostarts the previously enabled symbols(using
the start_trading/1 function) as well as stop_trading/1 updates the status inside the database (so
the symbol isn’t autostarted at the next initialization).

Implement the shutdown trading functionality


Last but not least, we will move on to the shutdown_trading/1 functionality. Let’s start with the
simplest part which is delegating the function call to the [Link] module
from the Naive module(interface):

1 # /apps/naive/lib/[Link]
2 ...
3 defdelegate shutdown_trading(symbol), to: DynamicSymbolSupervisor
4 ...

Next, we will create a shutdown_trading/1 function inside the [Link]


module where we will check is there any trading going on for that symbol(the same as for start/stop),
and in case of trading happening we will inform the [Link] process handling that symbol that
settings have been updated:
Chapter 12 - Start, stop, shutdown and autostart trading 137

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def shutdown_trading(symbol) when is_binary(symbol) do
4 symbol = [Link](symbol)
5
6 case get_pid(symbol) do
7 nil ->
8 [Link]("Trading on #{symbol} already stopped")
9 {:ok, _settings} = update_trading_status(symbol, "off")
10
11 _pid ->
12 [Link]("Shutdown of trading on #{symbol} initialized")
13 {:ok, settings} = update_trading_status(symbol, "shutdown")
14 [Link](:settings_updated, settings)
15 {:ok, settings}
16 end
17 end
18 ...

The crucial part of the implementation above is the notify(:settings_updated, settings) where
we inform the [Link] process that it needs to update trading settings.
Currently, the [Link] module does not support updating the settings after startup - let’s add
a new interface function together with a callback function that will handle settings updating:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 # add the below function as the last clause of the `notify` function
4 def notify(:settings_updated, settings) do
5 [Link](
6 :"#{__MODULE__}-#{[Link]}",
7 {:update_settings, settings}
8 )
9 end
10
11 # add the below handler function as the last clause of `handle_call` function
12 def handle_call(
13 {:update_settings, new_settings},
14 _,
15 state
16 ) do
17 {:reply, :ok, %{state | settings: new_settings}}
18 end
Chapter 12 - Start, stop, shutdown and autostart trading 138

Ok, we have a way to update the settings of the [Link] process “on the go” but what effects
should the shutdown state have on the [Link]’s actions?
There are two places that require modification:

• whenever the [Link] process will finish the trade cycle, [Link] process should
not start a new one, as well as check, was that the last trader process and if that was the
case it needs to call the Naive.stop_trading/1 function with its symbol to terminate whole
supervision tree for that symbol
• whenever the [Link] process will receive a rebuy notification, it should just ignore it
when the symbol is in the shutdown state.

Let’s look at the updated implementation of the “end of trade” handler:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 def handle_info(
4 {:DOWN, _ref, :process, trader_pid, :normal},
5 %{traders: traders, symbol: symbol, settings: settings} = state
6 ) do
7 [Link]("#{symbol} trader finished trade - restarting")
8
9 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
10 nil ->
11 [Link](
12 "Tried to restart finished #{symbol} " <>
13 "trader that leader is not aware of"
14 )
15
16 if [Link] == "shutdown" and traders == [] do # <= additional check
17 Naive.stop_trading([Link])
18 end
19
20 {:noreply, state}
21
22 index ->
23 new_traders =
24 if [Link] == "shutdown" do # <= refactored code
25 [Link](
26 "The leader won't start a new trader on #{symbol} " <>
27 "as symbol is in the 'shutdown' state"
28 )
29
Chapter 12 - Start, stop, shutdown and autostart trading 139

30 if length(traders) == 1 do
31 Naive.stop_trading([Link])
32 end
33
34 List.delete_at(traders, index)
35 else
36 new_trader_data = start_new_trader(fresh_trader_state(settings))
37 List.replace_at(traders, index, new_trader_data)
38 end
39
40 {:noreply, %{state | traders: new_traders}}
41 end
42 end
43 ...

As visible in the above code, whenever the [Link] process will finish the trade cycle, the
[Link] process will check can it find a record of that trader in its state (no changes here).
We will modify the callback so the leader process will check the [Link]. In the shutdown
status it checks wheater it was the last trader in the traders list, to terminate the whole tree at that
time(using the Naive.stop_trading/1 function).
The second callback that we need to modify is the rebuy triggered:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 def handle_call(
4 {:rebuy_triggered, new_trader_state},
5 {trader_pid, _},
6 %{traders: traders, symbol: symbol, settings: settings} = state
7 ) do
8 case Enum.find_index(traders, &(&[Link] == trader_pid)) do
9 nil ->
10 [Link]("Rebuy triggered by trader that leader is not aware of")
11 {:reply, :ok, state}
12
13 index ->
14 traders =
15 if [Link] == length(traders) do
16 [Link]("All traders already started for #{symbol}")
17 traders
18 else
19 if [Link] == "shutdown" do # <= refactored code
20 [Link](
Chapter 12 - Start, stop, shutdown and autostart trading 140

21 "The leader won't start a new trader on #{symbol} " <>


22 "as symbol is in shutdown state(rebuy triggered)"
23 )
24
25 traders
26 else
27 [Link]("Starting new trader for #{symbol}")
28 [start_new_trader(fresh_trader_state(settings)) | traders]
29 end
30 end
31
32 old_trader_data = [Link](traders, index)
33 new_trader_data = %{old_trader_data | :state => new_trader_state}
34 new_traders = List.replace_at(traders, index, new_trader_data)
35
36 {:reply, :ok, %{state | :traders => new_traders}}
37 end
38 end
39 ...

In the above rebuy_triggered handler function we added branching on the [Link] and
we simply ignore the rebuy notification when the symbol is in the shutdown status.
The final change will be to create a new migration that will update the TradingStatusEnum to have
shutdown option:

1 $ cd apps/naive
2 $ mix [Link] update_trading_status
3 * creating priv/repo/migrations/20210205232303_update_trading_status.exs

Inside the generated migration file we need to excute a raw SQL command:

1 # /apps/naive/priv/repo/migrations/20210205232303_update_trading_status.exs
2 defmodule [Link] do
3 use [Link]
4
5 @disable_ddl_transaction true
6
7 def change do
8 [Link] "ALTER TYPE trading_status ADD VALUE IF NOT EXISTS 'shutd\
9 own'"
10 end
11 end

We need to apply the same change to the [Link]:


Chapter 12 - Start, stop, shutdown and autostart trading 141

1 # /apps/naive/lib/naive/schema/trading_status_enum.ex
2 import EctoEnum
3
4 defenum([Link], :trading_status, [:on, :off, :shutdown])

Don’t forget to run mix [Link] to run the new migration.


We can now test the shutdown_trading/1 functionality inside the iex:

1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("ethusdt")
4 [Link].651 [info] Starting streaming on ETHUSDT
5 {:ok, #PID<0.372.0>}
6 iex(2)> Naive.start_trading("ethusdt")
7 [Link].830 [info] Starting trading of ETHUSDT
8 [Link].867 [info] Starting new supervision tree to trade on ETHUSDT
9 {:ok, #PID<0.379.0>}
10 [Link].816 [info] Initializing new trader(1612648004814) for ETHUSDT
11 ...
12 [Link].448 [info] Rebuy triggered for ETHUSDT by the trader(1612648004814)
13 ...
14 [Link].900 [info] Rebuy triggered for ETHUSDT by the trader(1612648089409)
15 ...
16 [Link].927 [info] Rebuy triggered for ETHUSDT by the trader(1612648198900)
17 ...
18 [Link].202 [info] Rebuy triggered for ETHUSDT by the trader(1612648326215)
19 [Link].250 [info] Rebuy triggered for ETHUSDT by the trader(1612648325512)
20 [Link].250 [info] All traders already started for ETHUSDT
21
22 # at this moment we have 5 `[Link]` processes trading in parallel
23
24 iex(4)> Naive.shutdown_trading("ethusdt")
25 [Link].556 [info] Shutdown of trading on ETHUSDT initialized
26 {:ok,
27 %[Link]{
28 ...
29 }}
30 ...
31 [Link].855 [info] Trader(1612648407202) finished trade cycle for ETHUSDT
32 [Link].855 [info] ETHUSDT trader finished trade - restarting
33 [Link].855 [warn] The leader won't start a new trader on ETHUSDTas symbol is in s\
34 hutdown state
Chapter 12 - Start, stop, shutdown and autostart trading 142

35 [Link].768 [info] Trader(1612648325512) finished trade cycle for ETHUSDT


36 [Link].768 [info] ETHUSDT trader finished trade - restarting
37 [Link].768 [warn] The leader won't start a new trader on ETHUSDTas symbol is in s\
38 hutdown state
39 [Link].857 [info] Trader(1612648326215) finished trade cycle for ETHUSDT
40 [Link].857 [info] ETHUSDT trader finished trade - restarting
41 [Link].857 [warn] The leader won't start a new trader on ETHUSDTas symbol is in s\
42 hutdown state
43 [Link].079 [info] Trader(1612648089409) finished trade cycle for ETHUSDT
44 [Link].079 [info] ETHUSDT trader finished trade - restarting
45 [Link].079 [warn] The leader won't start a new trader on ETHUSDTas symbol is in s\
46 hutdown state
47 [Link].401 [info] Trader(1612648004814) finished trade cycle for ETHUSDT
48 [Link].401 [info] ETHUSDT trader finished trade - restarting
49 [Link].401 [warn] The leader won't start a new trader on ETHUSDTas symbol is in s\
50 hutdown state
51 [Link].401 [info] Stopping trading of ETHUSDT

As we can see from the logs above, our naive strategy grown from 1 to 5 [Link] processes
running in parallel, then we called the shutdown_trading/1 function. In the shutdown status,
the [Link] process ignored rebuy notifications and wasn’t starting any new [Link]
processes as the old ones were finishing. At the moment when the last [Link] process finished
the trade cycle, the [Link] called stop_trading/1 on “it’s” symbol, terminating the whole
supervision tree for that symbol.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁴⁴
⁴⁴[Link]
Chapter 13 - Abstract duplicated
supervision code
Objectives
• overview of requirements
• pseudo generalize [Link] module
• utilize pseudo generalized code inside the [Link]
• implement a truly generic [Link]
• use the [Link] module inside the streamer application

Overview of requirements
In the last few chapters, we went through adding and modifying the dynamic supervision tree
around the naive and streamer applications’ workers. Initially, we just copied the implementation
from the streamer application to the naive application (with a few minor tweaks like log messages).
That wasn’t the most sophisticated solution and we will address this copy-paste pattern in this
chapter.
We will write an “extension” of the DynamicSupervisor that allows to start, stop and autostart
workers. Just to keep things simple we will create a new application inside our umbrella project
where we will place the logic. This will save us from creating a new repo for time being.
Our new [Link] module will hold all the logic responsible for starting, stopping,
and autostarting worker processes. To limit the boilerplate inside the implementation modules (like
[Link] or [Link]) we will utilize the use
macro that will dynamically generate low-level wiring for us.

Psuedo generalize [Link] module


Let’s start by creating a new non supervised application called core inside our umbrella project. At
this moment our “abstraction code” will sit inside it just to keep things simple as otherwise, we
would need to create a new repo and jump between codebases which we will avoid for time being:
Chapter 13 - Abstract duplicated supervision code 144

1 $ cd apps
2 $ mix new core
3 * creating [Link]
4 * creating .[Link]
5 * creating .gitignore
6 * creating [Link]
7 * creating lib
8 * creating lib/[Link]
9 * creating test
10 * creating test/test_helper.exs
11 * creating test/core_test.exs
12 ...

We can now create a new directory called core inside the apps/core/lib directory and a new file
called service_supervisor.ex inside it where we will put all abstracted starting/stopping/autostart-
ing logic.
Let’s start with an empty module:

1 # /apps/core/lib/core/service_supervisor.ex
2 defmodule [Link] do
3
4 end

The first step in our refactoring process will be to move(cut) all of the functions from the
[Link] (excluding the start_link/1, init/1 and shutdown_trading/1)
and put them inside the [Link] module which should look as follows:

1 # /apps/core/lib/core/service_supervisor.ex
2 defmodule [Link] do
3
4 def autostart_trading() do
5 ...
6 end
7
8 def start_trading(symbol) when is_binary(symbol) do
9 ...
10 end
11
12 def stop_trading(symbol) when is_binary(symbol) do
13 ...
14 end
15
Chapter 13 - Abstract duplicated supervision code 145

16 defp get_pid(symbol) do
17 ...
18 end
19
20 defp update_trading_status(symbol, status)
21 when is_binary(symbol) and is_binary(status) do
22 ...
23 end
24
25 defp start_symbol_supervisor(symbol) do
26 ...
27 end
28
29 defp fetch_symbols_to_trade() do
30 ...
31 end
32 end

All of the above code is trading related - we need to rename functions/logs to be more generic.
Starting with autostart_trading/0 we can rename it to autostart_workers/0:

1 # /apps/core/lib/core/service_supervisor.ex
2 ...
3 def autostart_workers() do # <= updated function name
4 fetch_symbols_to_start() # <= updated function name
5 |> [Link](&start_worker/1) # <= updated function name
6 end
7 ...

As we updated two functions inside the autostart_workers/0 we need to update their implementa-
tions.
The start_trading/1 will become start_worker/1, internally we will inline the start_symbol_-
supervisor/1 function(move it’s contents inside the start_worker/1 function and remove the
start_symbol_supervisor/1 function) as it’s used just once inside this module as well as update_-
trading_status/2 need to be renamed to update_status/2.

The fetch_symbols_to_trade/0 will get updated to fetch_symbols_to_start/0:


Chapter 13 - Abstract duplicated supervision code 146

1 # /apps/core/lib/core/service_supervisor.ex
2 def start_worker(symbol) when is_binary(symbol) do # <= updated name
3 symbol = [Link](symbol)
4
5 case get_pid(symbol) do
6 nil ->
7 [Link]("Starting trading of #{symbol}")
8 {:ok, _settings} = update_status(symbol, "on") # <= updated name
9
10 {:ok, _pid} =
11 DynamicSupervisor.start_child(
12 [Link],
13 {[Link], symbol}
14 ) # ^^^^^^ inlined `start_symbol_supervisor/1`
15
16 pid ->
17 [Link]("Trading on #{symbol} already started")
18 {:ok, _settings} = update_status(symbol, "on") # <= updated name
19 {:ok, pid}
20 end
21 end
22
23 ...
24
25 defp fetch_symbols_to_start() do # <= updated name
26 ...
27 end

Inside the above code we updated the update_trading_status/2 call to update_status/2 so we need
to update the function header to match:

1 # /apps/core/lib/core/service_supervisor.ex
2 defp update_status(symbol, status) # <= updated name
3 when is_binary(symbol) and is_binary(status) do
4 ...
5 end

Last function to rename in this module will be the stop_trading/1 to stop_worker/1, we also need
to update calls to update_trading_status/2 to update_status/2 as it was renamed:
Chapter 13 - Abstract duplicated supervision code 147

1 # /apps/core/lib/core/service_supervisor.ex
2 def stop_worker(symbol) when is_binary(symbol) do # <= updated name
3 symbol = [Link](symbol)
4
5 case get_pid(symbol) do
6 nil ->
7 [Link]("Trading on #{symbol} already stopped")
8 {:ok, _settings} = update_status(symbol, "off") # <= updated name
9
10 pid ->
11 [Link]("Stopping trading of #{symbol}")
12
13 :ok =
14 DynamicSupervisor.terminate_child(
15 [Link],
16 pid
17 )
18
19 {:ok, _settings} = update_status(symbol, "off") # <= updated name
20 end
21 end

At this moment we have a pseudo-generic implementation of start_worker/1 and stop_worker/1


inside the [Link] module. Function names are generic but they still refer to Repo,
Settings, and other modules specific to the naive app’s implementation. We are probably in a worse
situation than we have been before starting this refactoring ;) but don’t fear this was just the first
step on the way to abstract away that starting, stopping and autostarting code.

Utilize pseudo generalized code inside the


[Link]

Before we will jump back to the naive’s application modules we need to add the core application
the dependencies of the naive application:
Chapter 13 - Abstract duplicated supervision code 148

1 # /apps/naive/[Link]
2 defp deps do
3 [
4 {:binance, "~> 0.7.1"},
5 {:binance_mock, in_umbrella: true},
6 {:core, in_umbrella: true}, # <= core dep added
7 ....

Let’s get back to the [Link] where we expect functions that we just cut
out to exist like start_trading/1 or stop_trading/1.
Let’s reimplement more generic versions of those functions as just simple calls to the [Link]
module:

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def autostart_workers() do
4 [Link].autostart_workers()
5 end
6
7 def start_worker(symbol) do
8 [Link].start_worker(symbol)
9 end
10
11 def stop_worker(symbol) do
12 [Link].stop_worker(symbol)
13 end

We also need to update the shutdown_trading/1 function as we removed all the private functions
that it relies on:

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 def shutdown_worker(symbol) when is_binary(symbol) do # <= updated name
3 symbol = [Link](symbol)
4
5 case [Link].get_pid(symbol) do # <= module added
6 nil ->
7 [Link]("Trading on #{symbol} already stopped")
8 {:ok, _settings} = [Link].update_status(symbol, "off") # <= \
9 updated name + module
10
11 _pid ->
12 [Link]("Shutdown of trading on #{symbol} initialized")
Chapter 13 - Abstract duplicated supervision code 149

13 {:ok, settings} = [Link].update_status(symbol, "shutdown") #\


14 <= updated name + module
15 [Link](:settings_updated, settings)
16 {:ok, settings}
17 end
18 end

As we were moving all the private helper functions we didn’t make them public so the [Link]
module can use them - we will fix that now together with temporary aliases/require/imports at the
top of the [Link]:

1 # /apps/core/lib/core/service_supervisor.ex
2 defmodule [Link] do
3
4 require Logger # <= added require
5
6 import [Link], only: [from: 2] # <= added import
7
8 alias [Link] # <= added alias
9 alias [Link] # <= added alias
10
11 ...
12
13 def get_pid(symbol) do # <= updated from private to public
14 ...
15 end
16
17 def update_status(symbol, status) # <= updated from private to public
18 when is_binary(symbol) and is_binary(status) do
19 ...
20 end

As fetch_symbols_to_start/0 is only used internally by the [Link] module itself,


we don’t need to make it public.
We can also remove the aliases and import from the [Link] as it won’t
need them anymore.
Next step will be to add ecto to the deps of the core application as it will make db queries now:
Chapter 13 - Abstract duplicated supervision code 150

1 # /apps/core/[Link]
2 defp deps do
3 [
4 {:ecto_sql, "~> 3.0"}
5 ]
6 end

As we modified the interface of the [Link] (for example renamed start_-


trading/1 to start_worker/1 and others) we need to modify the [Link]’s children list -
more specifically the Task process:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 {Task,
4 fn ->
5 [Link].autostart_workers() # <= func name updated
6 end}
7 ...

Last step will be to update interface of the naive application:

1 # /apps/naive/lib/[Link]
2 alias [Link]
3
4 defdelegate start_trading(symbol), to: DynamicSymbolSupervisor, as: :start_worker
5 defdelegate stop_trading(symbol), to: DynamicSymbolSupervisor, as: :stop_worker
6 defdelegate shutdown_trading(symbol), to: DynamicSymbolSupervisor, as: :shutdown_w\
7 orker

Believe it or not, but at this moment(ignoring all of the warnings because we created a circular
dependency between the core and the naive applications - which we will fix in the next steps) our
application runs just fine! We are able to start and stop trading, autostarting works as well.

Implement a truly generic [Link]


Ok. Why did we even do this? What we are aiming for is a separation between the interface of our
[Link] module (like start_worker/1, autostart_workers/0 and stop_-
worker/1) and the implementation which is now placed inside the [Link] module.

That’s all nice and to-some-extent understandable but [Link] module is not a
generic module. We can’t use it inside the streaming application to supervise the [Link]
processes.
So, what’s the point? Well, we can make it even more generic!
Chapter 13 - Abstract duplicated supervision code 151

First path starting with the fetch_symbols_to_start/0 function


Moving on to full generalization of the [Link] module. We will start with the
helper functions first as they are the ones doing the work and they need to be truly generalized first:

1 # /apps/core/lib/core/service_supervisor.ex
2 def fetch_symbols_to_start() do
3 [Link](
4 from(s in Settings,
5 where: [Link] == "on",
6 select: [Link]
7 )
8 )
9 end

The fetch_symbols_to_start/0 function uses Repo and Settings that are aliased at the top of the
[Link] module. This just won’t work with any other applications as Streamer will
require its own Repo and Settings modules etc.
To fix that we will pass both repo and schema as arguments to the fetch_symbols_to_start/0
function which will become fetch_symbols_to_start/2:

1 # /apps/core/lib/core/service_supervisor.ex
2 def fetch_symbols_to_start(repo, schema) do # <= args added
3 [Link]( # <= lowercase `repo` is an argument not aliased module
4 from(s in schema, # <= settings schema module passed as arg
5 where: [Link] == "on",
6 select: [Link]
7 )
8 )
9 end

This will have a knock-on effect on any functions that are using fetch_symbols_to_start/0 - now
they need to use fetch_symbols_to_start/2 and pass appropriate Repo and Schema modules.
So, the fetch_symbols_to_start/0 is referenced by the autostart_workers/0 - we will need to
modify it to pass the repo and schema to the fetch_symbols_to_start/2 and as it’s inside the
[Link] module it needs to get them passed as arguments:
Chapter 13 - Abstract duplicated supervision code 152

1 # /apps/core/lib/core/service_supervisor.ex
2 def autostart_workers(repo, schema) do # <= args added
3 fetch_symbols_to_start(repo, schema) # <= args passed
4 |> [Link](&start_worker/1)
5 end

Going even further down the line, autostart_workers/0 is referenced by the autostart_workers/0
inside the [Link] module. As this module is (naive) application-specific,
it is a place where repo and schema are known from the context - for the naive application repo is
the [Link] module and schema is the [Link] module:

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def autostart_workers() do
4 [Link].autostart_workers(
5 [Link], # <= new value passed
6 [Link] # <= new value passed
7 )
8 end

This finishes the first of multiple paths that we need to follow to fully refactor the [Link]
module.

Second path starting with the update_status/2


Let’s don’t waste time and start from the other helper function inside the [Link]
module. This time we will make the update_status/2 function fully generic:

1 # /apps/core/lib/core/service_supervisor.ex
2 def update_status(symbol, status, repo, schema) # <= args added
3 when is_binary(symbol) and is_binary(status) do
4 repo.get_by(schema, symbol: symbol) # <= using dynamic repo and shcema modules
5 |> [Link](%{status: status})
6 |> [Link]() # <= using dynamic repo module
7 end

As previously we added repo and schema as arguments and modified the body of the function
to utilize them instead of hardcoded modules (aliased at the top of the [Link]
module).
In the same fashion as previously, we need to check “who” is using the update_status/2 and
update those calls to update_status/4. The function is used inside the start_worker/1 and the
stop_worker/1 inside the [Link] module so as previously we need to bubble them
up(pass via arguments to both start_worker/1 and stop_worker/1 functions):
Chapter 13 - Abstract duplicated supervision code 153

1 # /apps/core/lib/core/service_supervisor.ex
2 def start_worker(symbol, repo, schema) when is_binary(symbol) do # <= new args
3 ...
4 {:ok, _settings} = update_status(symbol, "on", repo, schema) # <= args passed
5 ...
6 {:ok, _settings} = update_status(symbol, "on", repo, schema) # <= args passed
7 ...
8 end
9
10 def stop_worker(symbol, repo, schema) when is_binary(symbol) do # <= new args
11 ...
12 {:ok, _settings} = update_status(symbol, "off", repo, schema) # <= args pass\
13 ed
14 ...
15 {:ok, _settings} = update_status(symbol, "off", repo, schema) # <= args pass\
16 ed
17 ...
18 end

As we modified both start_worker/1 and stop_worker/1 by adding two additional arguments we


need to update all references to them and here is where things branch out a bit.
We will start with start_worker/1 function (which is now start_worker/3) - it’s used by the
autostart_workers/2 inside [Link] module. The autostart_workers/2 function
already has repo and schema so we can just pass them to the start_worker/3:

1 # /apps/core/lib/core/service_supervisor.ex
2 def autostart_workers(repo, schema) do
3 fetch_symbols_to_start(repo, schema)
4 |> [Link](&start_worker(&1, repo, schema)) # <= args passed
5 end

Both the start_worker/3 and the stop_worker/3 function are used by the functions inside the
[Link] module. We need to pass the Repo and Schema in the same fashion
as previously with autostart_workers/2 function:
Chapter 13 - Abstract duplicated supervision code 154

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 def start_worker(symbol) do
3 [Link].start_worker(
4 symbol,
5 [Link], # <= new arg passed
6 [Link] # <= new arg passed
7 )
8 end
9
10 def stop_worker(symbol) do
11 [Link].stop_worker(
12 symbol,
13 [Link], # <= new arg passed
14 [Link] # <= new arg passed
15 )
16 end

At this moment there’s no code inside the [Link] module referencing the aliased
Repo nor Schema modules so we can safely remove both aliases - definitely, we are moving in the
right direction!
Btw. Our project still works at this stage, we can start/stop trading and it autostarts trading.

Third path starting with the get_pid/1 function


Starting again from the most nested helper function - this time the get_pid/1:

1 # /apps/core/lib/core/service_supervisor.ex
2 def get_pid(symbol) do
3 [Link](:"[Link]-#{symbol}")
4 end

We can see that it has a hardcoded [Link] worker module - we need to make this
part dynamic by using the worker_module argument:

1 # /apps/core/lib/core/service_supervisor.ex
2 def get_pid(worker_module, symbol) do # <= arg added
3 [Link](:"#{worker_module}-#{symbol}") # <= arg used
4 end

Moving up to functions that are referencing the get_pid/1 function, those will be the start_-
worker/3 and the stop_worker/3 function.
Chapter 13 - Abstract duplicated supervision code 155

As those are the two last functions to be updated, we will look into them more closely to finish our
refactoring in this 3rd run. At this moment both need to add worker_module as both are calling the
get_pid/2 function. Looking at both function we can see two other hardcoded details:

• inside log message there are words “trading” - we can replace them so we will utilize the
worker_module and symbol arguments
• there are two references to the [Link] which we will replace with
the module argument
• there is one more reference to the [Link] module which we will replace with
the worker_module argument

Let’s look at updated functions:

1 # /apps/core/lib/core/service_supervisor.ex
2 # module and worker_module args added vvvv
3 def start_worker(symbol, repo, schema, module, worker_module)
4 when is_binary(symbol) do
5 symbol = [Link](symbol)
6
7 case get_pid(worker_module, symbol) do # <= worker_module passed
8 nil ->
9 [Link]("Starting #{worker_module} worker for #{symbol}") # <= dynamic t\
10 ext
11 {:ok, _settings} = update_status(symbol, "on", repo, schema)
12 {:ok, _pid} = DynamicSupervisor.start_child(module, {worker_module, symbol})\
13 # <= args used
14
15 pid ->
16 [Link]("#{worker_module} worker for #{symbol} already started") # <= dy\
17 namic text
18 {:ok, _settings} = update_status(symbol, "on", repo, schema)
19 {:ok, pid}
20 end
21 end
22
23 # module and worker_module added as args vvvv
24 def stop_worker(symbol, repo, schema, module, worker_module)
25 when is_binary(symbol) do
26 symbol = [Link](symbol)
27
28 case get_pid(worker_module, symbol) do # <= worker_module passed
29 nil ->
Chapter 13 - Abstract duplicated supervision code 156

30 [Link]("#{worker_module} worker for #{symbol} already stopped") # <= dy\


31 namic text
32 {:ok, _settings} = update_status(symbol, "off", repo, schema)
33
34 pid ->
35 [Link]("Stopping #{worker_module} worker for #{symbol}") # <= dynamic t\
36 ext
37 :ok = DynamicSupervisor.terminate_child(module, pid) # <= arg used
38 {:ok, _settings} = update_status(symbol, "off", repo, schema)
39 end
40 end

Inside both the start_worker/5 and the stop_worker/5 functions we modified:

• get_pid/1 to pass the worker_module


• Logger’s messages to use the worker_module and symbol
• DynamicSupervisor’s functions to use the module and the worker_module

Again, as we modified start_worker/5 we need to make the last change inside the [Link]
module - autostart_workers/2 uses the start_worker/5 function:

1 # /apps/core/lib/core/service_supervisor.ex
2 def autostart_workers(repo, schema, module, worker_module) do # <= args added
3 fetch_symbols_to_start(repo, schema)
4 |> [Link](&start_worker(&1, repo, schema, module, worker_module)) # <= args ad\
5 ded
6 end

Just for referrence - final function headers look as following:

1 # /apps/core/lib/core/service_supervisor.ex
2 defmodule [Link] do
3 def autostart_workers(repo, schema, module, worker_module) do
4 ...
5 end
6
7 def start_worker(symbol, repo, schema, module, worker_module)
8 when is_binary(symbol) do
9 ...
10 end
11
12 def stop_worker(symbol, repo, schema, module, worker_module)
Chapter 13 - Abstract duplicated supervision code 157

13 when is_binary(symbol) do
14 ...
15 end
16
17 def get_pid(worker_module, symbol) do
18 ...
19 end
20
21 def update_status(symbol, status, repo, schema)
22 when is_binary(symbol) and is_binary(status) do
23 ...
24 end
25
26 def fetch_symbols_to_start(repo, schema) do
27 ...
28 end
29 end

That finishes the 3rd round of updates inside the [Link] module, now we need
to update the [Link] module to use updated functions(and pass required
arguments):

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 ...
3 def autostart_workers() do
4 [Link].autostart_workers(
5 [Link],
6 [Link],
7 __MODULE__, # <= added arg
8 [Link] # <= added arg
9 )
10 end
11
12 def start_worker(symbol) do
13 [Link].start_worker(
14 symbol,
15 [Link],
16 [Link],
17 __MODULE__, # <= added arg
18 [Link] # <= added arg
19 )
20 end
21
Chapter 13 - Abstract duplicated supervision code 158

22 def stop_worker(symbol) do
23 [Link].stop_worker(
24 symbol,
25 [Link],
26 [Link],
27 __MODULE__, # <= added arg
28 [Link] # <= added arg
29 )
30 end
31
32 def shutdown_worker(symbol) when is_binary(symbol) do
33 symbol = [Link](symbol)
34
35 case [Link].get_pid([Link], symbol) do # <= arg \
36 added
37 nil ->
38 [Link]("#{[Link]} worker for #{symbol} already stopped"\
39 ) # <= updated
40
41 {:ok, _settings} =
42 [Link].update_status(symbol, "off", [Link], [Link]\
43 [Link]) # <= args added
44
45 _pid ->
46 [Link]("Initializing shutdown of #{[Link]} worker for #\
47 {symbol}") # <= updated
48
49 {:ok, settings} =
50 [Link].update_status(
51 symbol,
52 "shutdown",
53 [Link],
54 [Link]
55 ) # ^^^ additional args passed
56
57 [Link](:settings_updated, settings)
58 {:ok, settings}
59 end
60 end

We needed to update referrences inside the shutdown_trading/1 function as well, as it calls get_-
pid/2 and update_status/4 functions.
We are now done with refactoring the [Link] module, it’s completely generic and
Chapter 13 - Abstract duplicated supervision code 159

can be used inside both streamer and the naive applications.


At this moment to use the [Link] module we need to write interface functions in
our supervisors and pass multiple arguments in each one - again, would need to use of copies those
functions inside streamer and naive application. In the next section we will look into how could we
leverage Elixir macros⁴⁵ to remove that boilerplate.

Remove boilerplate using use macro


Elixir provides a way to use other modules. Idea is that inside the [Link]
module we are useing the DynamicSupervisor module currently but we could use [Link]:

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 defmodule [Link] do
3 use [Link]

To be able to use the [Link] module it needs to provide the __using__/1 macro.
As the simplest content of that macro, we can use here would be just to use the DynamicSupervisor
inside:

1 # /apps/core/lib/core/service_supervisor.ex
2 defmacro __using__(opts) do
3 [Link](opts)
4 quote location: :keep do
5 use DynamicSupervisor
6 end
7 end

How does this work? As an oversimplification, you can think about it as Elixir will look through
the contents of quote’s body(everything between quote ... do and end) in search for the unquote
function which can inject dynamic content. All of this will become much clearer as we will go
through the first example but the important part is that after executing any potential unquotes
inside the quote’s body, Elixir will grab that code as it would be just part of code and place it
inside the [Link] module at compile time(we will also see the result of
[Link]/1 at compilation).

At this moment(after swapping to use [Link]) our code still works and it’s exactly
as we would simply have use DynamicSupervisor inside the [Link]
module - as at compilation, it will be swapped to it either way(as per contents of the __using__/1
macro).
⁴⁵[Link]
Chapter 13 - Abstract duplicated supervision code 160

As the autostart_workers/0 function is a part of the boilerplate, we will move it from the
[Link] module to the [Link] module inside the __-
using__/1 macro.

Ok, but it has all of those other naive application-specific arguments - where will we get those?
That’s what that opts argument is for inside the __using__/1 macro. When we call use [Link]
we can pass an additional keyword list which will contain all naive application-specific details:

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 defmodule [Link] do
3 use [Link],
4 repo: [Link],
5 schema: [Link],
6 module: __MODULE__,
7 worker_module: [Link]

We can now update the __using__/1 macro to assign all of those details to variables(instead of using
[Link]/1):

1 # /apps/core/lib/core/service_supervisor.ex
2 defmacro __using__(opts) do
3 {:ok, repo} = [Link](opts, :repo)
4 {:ok, schema} = [Link](opts, :schema)
5 {:ok, module} = [Link](opts, :module)
6 {:ok, worker_module} = [Link](opts, :worker_module)
7 ...

At this moment we can use those dynamic values to generate code that will be specific to the imple-
mentation module for example autostart_workers/0 that we moved from the [Link]
module and will need to have different values passed to it(like [Link] as worker_module)
for the streamer application. We can see that it requires inserting those dynamic values inside the
autstart_workers/0 but how to dynamically inject arguments - unquote to the rescue. When we
will update the autostart_workers/0 function from:
Chapter 13 - Abstract duplicated supervision code 161

1 # sample moved code from the `[Link]` module


2 def autostart_workers() do
3 [Link].autostart_workers(
4 [Link],
5 [Link],
6 __MODULE__,
7 [Link]
8 )
9 end

to:

1 # updated code that will become part of the `__using__/1` macro


2 def autostart_workers() do
3 [Link].autostart_workers(
4 unquote(repo),
5 unquote(schema),
6 unquote(module),
7 unquote(worker_module)
8 )
9 end

in the end generated code that will be “pasted” to the [Link] module at
compile time will be:

1 # compiled code attached to the `[Link]` module


2 def autostart_workers() do
3 [Link].autostart_workers(
4 [Link],
5 [Link],
6 [Link],
7 [Link]
8 )
9 end

This way we can dynamically create functions for any application(for the streamer application, it
will generate function call with the [Link], [Link] args etc).
We can apply that to all of the passed variables inside the autostart_workers/0 function - just for
reference full macro will look as follows:
Chapter 13 - Abstract duplicated supervision code 162

1 # /apps/core/lib/core/service_supervisor.ex
2 defmacro __using__(opts) do
3 {:ok, repo} = [Link](opts, :repo)
4 {:ok, schema} = [Link](opts, :schema)
5 {:ok, module} = [Link](opts, :module)
6 {:ok, worker_module} = [Link](opts, :worker_module)
7
8 quote location: :keep do
9 use DynamicSupervisor
10
11 def autostart_workers() do
12 [Link].autostart_workers(
13 unquote(repo),
14 unquote(schema),
15 unquote(module),
16 unquote(worker_module)
17 )
18 end
19 end
20 end

You can think about the above macro that it will substitute the unquote(..) parts with passed values
and then it will grab the whole contents between quote ... do and end and it will paste it to
the [Link] module at compile-time - we can visualize generated/”pasted”
code as:

1 # code generated by the `__using__/1` macro that


2 # will be "pasted" to the `[Link]` module
3 use DynamicSupervisor
4
5 def autostart_workers() do
6 [Link].autostart_workers(
7 [Link],
8 [Link],
9 [Link],
10 [Link]
11 )
12 end

This is exactly the code that we had before inside the [Link] module but
now it’s stored away inside the [Link]’s __using__/1 macro and it doesn’t need
to be implemented/copied across twice into two apps anymore.
Chapter 13 - Abstract duplicated supervision code 163

We can now follow the same principle and move start_worker/1 and stop_worker/1 from the
[Link] module into __using__/1 macro inside the [Link]
module:

1 # /apps/core/lib/core/service_supervisor.ex
2 # inside the __using__/1 macro
3 ...
4 def start_worker(symbol) do
5 [Link].start_worker(
6 symbol, # <= this needs to stay as variable
7 unquote(repo),
8 unquote(schema),
9 unquote(module),
10 unquote(worker_module)
11 )
12 end
13
14 def stop_worker(symbol) do
15 [Link].stop_worker(
16 symbol, # <= this needs to stay as variable
17 unquote(repo),
18 unquote(schema),
19 unquote(module),
20 unquote(worker_module)
21 )
22 end

Here we have an example of an execution time variable called symbol that we should not unquote as
it will be different per function call (source code should have symbol variable there not for example
"NEOUSDT").

At this moment the [Link] consists of only start_link/1, init/1 and


shutdown_worker/1, it’s under 50 lines of code and works exactly as before refactoring. All of the
boilerplate was moved to the [Link] module.
We left the shutdown_worker/1 function as it’s specific to the naive application, but inside it, we
utilize both the get_pid/2 and the update_status/4 functions where we are passing the naive
application-specific variables(like [Link]).
To make things even nicer we can create convenience wrappers for those two functions inside the
__using__/1 macro:
Chapter 13 - Abstract duplicated supervision code 164

1 # /apps/core/lib/core/service_supervisor.ex
2 # add below at the end of `quote` block inside `__using__/1`
3 defp get_pid(symbol) do
4 [Link].get_pid(
5 unquote(worker_module),
6 symbol
7 )
8 end
9
10 defp update_status(symbol, status) do
11 [Link].update_status(
12 symbol,
13 status,
14 unquote(repo),
15 unquote(schema)
16 )
17 end

As those will get compiled and “pasted” into the [Link] module we
can utilize them inside the shutdown_worker/1 function as they would be much simpler naive
application-specific local functions:

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 def shutdown_worker(symbol) when is_binary(symbol) do
3 symbol = [Link](symbol)
4
5 case get_pid(symbol) do # <= macro provided function
6 nil ->
7 [Link]("#{[Link]} worker for #{symbol} already stopped")
8 {:ok, _settings} = update_status(symbol, "off") # <= macro provided function
9
10 _pid ->
11 [Link]("Initializing shutdown of #{[Link]} worker for #\
12 {symbol}")
13 {:ok, settings} = update_status(symbol, "shutdown") # <= macro provided func\
14 tion
15 [Link](:settings_updated, settings)
16 {:ok, settings}
17 end
18 end

And now, a very last change - I promise ;) Both the start_link/1 and the init/1 functions are still
referencing the DynamicSupervisor module which could be a little bit confusing - let’s swap those
Chapter 13 - Abstract duplicated supervision code 165

calls to use the [Link] module (both to not confuse people and be consistent with
the use macro):

1 # /apps/naive/lib/naive/dynamic_symbol_supervisor.ex
2 def start_link(init_arg) do
3 [Link].start_link(__MODULE__, init_arg, name: __MODULE__)
4 end
5
6 def init(_init_arg) do
7 [Link](strategy: :one_for_one)
8 end

As we don’t want/need to do anything different inside the [Link] module than the
DynamicSupervisor is doing we can just delegate both of those inside the [Link]
module:

1 # /apps/core/lib/core/service_supervisor.ex
2 defdelegate start_link(module, args, opts), to: DynamicSupervisor
3 defdelegate init(opts), to: DynamicSupervisor

That finishes our refactoring of both the [Link] and the [Link]
modules.
We can test to confirm that everything works as expected:

1 $ iex -S mix
2 iex(1)> Naive.start_trading("NEOUSDT")
3 [Link].741 [info] Starting [Link] worker for NEOUSDT
4 [Link].768 [info] Starting new supervision tree to trade on NEOUSDT
5 {:ok, #PID<0.464.0>}
6 [Link].455 [info] Initializing new trader(1614462159452) for NEOUSDT
7 iex(2)> Naive.stop_trading("NEOUSDT")
8 [Link].362 [info] Stopping [Link] worker for NEOUSDT
9 {:ok,
10 %[Link]{
11 ...
12 }}
13 iex(3)> Naive.start_trading("BTCUSDT")
14 [Link].689 [info] Starting [Link] worker for BTCUSDT
15 [Link].723 [info] Starting new supervision tree to trade on BTCUSDT
16 {:ok, #PID<0.475.0>}
17 [Link].182 [info] Initializing new trader(1614462251182) for BTCUSDT
18 BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
Chapter 13 - Abstract duplicated supervision code 166

19 (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution


20 $ iex -S mix
21 [Link].119 [info] Starting [Link] worker for BTCUSDT
22 [Link].161 [info] Starting new supervision tree to trade on BTCUSDT
23 [Link].213 [info] Initializing new trader(1614462444212) for BTCUSDT
24 iex(1)> Naive.shutdown_trading("BTCUSDT")
25 [Link].003 [info] Initializing shutdown of [Link] worker f\
26 or BTCUSDT
27 {:ok,
28 %[Link]{
29 ...
30 }}

The above test confirms that we can start, stop, and shut down trading on a symbol as well as
autostarting of trading works.

Use the [Link] module inside the streamer


application
As we are happy with the implementation of the [Link] module we can upgrade
the streamer application to use it.
We need to start with adding the core application to the list of dependencies of the streamer
application:

1 # /apps/streamer/[Link]
2 defp deps do
3 [
4 {:binance, "~> 0.7.1"},
5 {:core, in_umbrella: true}, # <= core added to deps
6 ...

We can now move on to the [Link] where we will remove every-


thing (really everything including imports, aliases and even require) beside the start_link/1 and
the init/1. As with the [Link] we will use the [Link]
and pass all required options - full implementation of the [Link]
module should look as follows:
Chapter 13 - Abstract duplicated supervision code 167

1 # /apps/streamer/lib/streamer/dynamic_streamer_supervisor.ex
2 defmodule [Link] do
3 use [Link],
4 repo: [Link],
5 schema: [Link],
6 module: __MODULE__,
7 worker_module: [Link]
8
9 def start_link(init_arg) do
10 [Link].start_link(__MODULE__, init_arg, name: __MODULE__)
11 end
12
13 def init(_init_arg) do
14 [Link](strategy: :one_for_one)
15 end
16 end

Not much to add here - we are useing the [Link] module and passing options to
it so it can macro generates streamer application-specific wrappers(like start_worker/1 or stop_-
worker/1 with required repo, schema etc) around generic logic from the [Link]
module.
Using the [Link] module will have an impact on the interface of the [Link]
as it will now provide functions like start_worker/1 instead of start_streaming/1 etc.
As with the naive application, we need to update the Task function inside the [Link]
module:

1 # /apps/streamer/lib/streamer/[Link]
2 ...
3 {Task,
4 fn ->
5 [Link].autostart_workers()
6 end}
7 ...

As well as main Streamer module needs to forward calls instead of delegating:


Chapter 13 - Abstract duplicated supervision code 168

1 # /apps/streamer/lib/[Link]
2 alias [Link]
3
4 defdelegate start_streaming(symbol), to: DynamicStreamerSupervisor, as: :start_wor\
5 ker
6 defdelegate stop_streaming(symbol), to: DynamicStreamerSupervisor, as: :stop_worker

We can run a quick test to confirm that indeed everything works as expected:

1 $ iex -S mix
2 iex(1)> Streamer.start_streaming("NEOUSDT")
3 [Link].813 [info] Starting [Link] worker for NEOUSDT
4 {:ok, #PID<0.465.0>}
5 iex(2)> Streamer.stop_streaming("NEOUSDT")
6 [Link].212 [info] Stopping [Link] worker for NEOUSDT
7 {:ok,
8 %[Link]{
9 __meta__: #[Link]<:loaded, "settings">,
10 id: "db8c9429-2356-4243-a08f-0d0e89b74986",
11 inserted_at: ~N[2021-02-25 [Link],
12 status: "off",
13 symbol: "NEOUSDT",
14 updated_at: ~N[2021-02-27 [Link]
15 }}
16 iex(3)> Streamer.start_streaming("LTCUSDT")
17 [Link].361 [info] Starting [Link] worker for LTCUSDT
18 {:ok, #PID<0.490.0>}
19 BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
20 (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
21 ^C
22 $ iex -S mix
23 ...
24 [Link].775 [info] Starting [Link] worker for LTCUSDT

This finishes the implementation for both the streamer and the naive application. We are generating
dynamic functions(metaprogramming) using Elixir macros which is a cool exercise to go through
and feels like superpowers ;)
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁴⁶
⁴⁶[Link]
Chapter 14 - Store trade events and
orders inside the database
Objectives
• overview of requirements
• create a new data_warehouse application in the umbrella
• connect to the database using Ecto
• store trade events’ data
• store orders’ data
• implement supervision

Overview of requirements
In the next chapter, we will move on to testing our strategy against historical data(aka backtesting
- I will explain that process in the next chapter). What we need to have in place before we will be
able to do that is both trade events and orders stored in the database.
Starting with the trade events. The streamer application could store trade events from Binance inside
its database but how would that work if we would like to introduce another source of non-streamed
trade events(ie. flat files, HTTP polling). It would be better if the [Link] process would
keep on streaming those trade events as it is and we would create a new application that would
subscribe to the existing trade_events:#{symbol} topic and store them to the database.
A similar idea applies to the orders’ data. At this moment the naive application uses the Binance
module to place orders. We could store them inside the naive application’s database but how would
that work if we would like to introduce another trading strategy. Holding data in separate databases
for each strategy would cause further complications in future reporting, auditing, etc.
To store trade events’ and orders’ data we will create a new application called data_warehouse
inside our umbrella project. It will subscribe to a trade_events:#{symbol} stream as well as
orders:#{symbol} stream, convert broadcasted data to its own representations(structs), and store
it inside the database.
Trade events are already broadcasted to the PubSub topic, orders on the other hand aren’t. We
will need to modify the [Link] module to broadcast the new and updated orders to the
orders:#{symbol} topic.
Chapter 14 - Store trade events and orders inside the database 170

After implementing the basic worker that will store the incoming data(trade events and orders)
inside the database, we will look into adding a supervision tree utilizing Elixir Registry⁴⁷. It will
allow us to skip registering every worker with a unique atom and will offer an easy lookup to fetch
pids instead.

Create a new data_warehouse application in the umbrella


Let’s start by creating a new application called data_warehouse inside our umbrella:

1 $ cd apps
2 $ mix new data_warehouse --sup
3 * creating [Link]
4 * creating .[Link]
5 * creating .gitignore
6 * creating [Link]
7 * creating lib
8 * creating lib/data_warehouse.ex
9 * creating lib/data_warehouse/[Link]
10 * creating test
11 * creating test/test_helper.exs
12 * creating test/data_warehouse_test.exs
13 ...

Connect to the database using Ecto


We can now follow similar steps as previously and add required dependencies (like the ecto) to its
deps by modifying its [Link] file:

1 # /apps/data_warehouse/[Link]
2 ...
3 defp deps do
4 [
5 {:ecto_sql, "~> 3.0"},
6 {:ecto_enum, "~> 1.4"},
7 {:phoenix_pubsub, "~> 2.0"},
8 {:postgrex, ">= 0.0.0"},
9 {:streamer, in_umbrella: true}
10 ]
11 end
12 ...
⁴⁷[Link]
Chapter 14 - Store trade events and orders inside the database 171

Additionally, we added the phoenix_pubsub module to be able to subscribe to the PubSub topic and
the streamer application to be able to use its [Link] struct.
We can now jump back to the terminal to install added dependencies and generate a new [Link]
module:

1 $ mix [Link]
2 ...
3 $ cd apps/data_warehouse
4 $ mix [Link] -r [Link]
5 * creating lib/data_warehouse
6 * creating lib/data_warehouse/[Link]
7 * updating ../../config/[Link]

Before we will be able to create migrations that will create our tables we need to update the generated
configuration inside the config/[Link] file:

1 # /config/[Link]
2 ...
3 config :data_warehouse, # <= added line
4 ecto_repos: [[Link]] # <= added line
5
6 config :data_warehouse, [Link],
7 database: "data_warehouse", # <= updated line
8 username: "postgres", # <= updated line
9 password: "postgres", # <= updated line
10 hostname: "localhost"
11 ...

and add the [Link] module to the children list of the [Link]’s
process:

1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 children = [
4 {[Link], []}
5 ]
6 ...

The last step will be to create a database by running mix [Link] -r [Link]
command.
This ends up the setup of the Ecto - we can now move on to the implementation of storing the orders
and the trade events.
Chapter 14 - Store trade events and orders inside the database 172

Store trade events’ data


The first step to store trade events inside the database will be to create a table that will hold our data.
We will start by creating the migration:

1 $ cd apps/data_warehouse
2 $ mix [Link] create_trade_events
3 * creating priv/repo/migrations
4 * creating priv/repo/migrations/20210222224514_create_trade_events.exs

The [Link] struct will serve as a list of columns for our new trade_events
table. Here’s the full implementation of our migration:

1 # /apps/data_warehouse/priv/repo/migrations/20210222224514_create_trade_events.exs
2 defmodule [Link] do
3 use [Link]
4
5 def change do
6 create table(:trade_events, primary_key: false) do
7 add(:id, :uuid, primary_key: true)
8 add(:event_type, :text)
9 add(:event_time, :bigint)
10 add(:symbol, :text)
11 add(:trade_id, :integer)
12 add(:price, :text)
13 add(:quantity, :text)
14 add(:buyer_order_id, :bigint)
15 add(:seller_order_id, :bigint)
16 add(:trade_time, :bigint)
17 add(:buyer_market_maker, :bool)
18
19 timestamps()
20 end
21 end
22 end

We added the additional id field to easily identify each trade event and our timestamps for
monitoring.
Let’s run the migration so it will create a new trade_events table for us:
Chapter 14 - Store trade events and orders inside the database 173

1 $ mix [Link]

The next step will be to create a new directory called schema inside the apps/data_warehouse/lib/data_-
warehouse directory. Inside it, we need to create a new schema file called trade_event.ex. We can
copy across the same columns from the migration straight to schema:

1 # /apps/data_warehouse/lib/data_warehouse/schema/trade_event.ex
2 defmodule [Link] do
3 use [Link]
4
5 @primary_key {:id, :binary_id, autogenerate: true}
6
7 schema "trade_events" do
8 field(:event_type, :string)
9 field(:event_time, :integer)
10 field(:symbol, :string)
11 field(:trade_id, :integer)
12 field(:price, :string)
13 field(:quantity, :string)
14 field(:buyer_order_id, :integer)
15 field(:seller_order_id, :integer)
16 field(:trade_time, :integer)
17 field(:buyer_market_maker, :boolean)
18
19 timestamps()
20 end
21 end

At this moment we should be able to execute crud(create, read[select], update, delete) operations
over the table using the above struct.
Currently, we can already store the trade events’ data inside the database so we can move on to
collecting it. Trade events are getting broadcasted by the [Link] process here:

1 # /apps/streamer/lib/streamer/[Link]
2 ...
3 [Link](
4 [Link],
5 "trade_events:#{trade_event.symbol}",
6 trade_event
7 )
8 ...
Chapter 14 - Store trade events and orders inside the database 174

We will implement a subscriber process that will be given a PubSub topic and it will store incoming
data inside the database.
Let’s start by creating a new folder called subscriber inside the apps/data_warehouse/lib/data_-
warehouse directory together with a new file called [Link] inside it:

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 defmodule [Link] do
3 use GenServer
4
5 require Logger
6
7 defmodule State do
8 @enforce_keys [:topic]
9 defstruct [:topic]
10 end
11
12 def start_link(topic) do
13 GenServer.start_link(
14 __MODULE__,
15 topic,
16 name: :"#{__MODULE__}-#{topic}"
17 )
18 end
19
20 def init(topic) do
21 {:ok,
22 %State{
23 topic: topic
24 }}
25 end
26 end

At this moment it’s just a box standard implementation of the GenServer with a state struct
containing a single key(:topic). We need to update the init/1 function to subscribe to the PubSub
topic:
Chapter 14 - Store trade events and orders inside the database 175

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 def init(topic) do
3 [Link]("DataWarehouse worker is subscribing to #{topic}")
4
5 [Link](
6 [Link],
7 topic
8 )
9 ...

Next, we need to add a handler for received messages:

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 def handle_info(%[Link]{} = trade_event, state) do
3 opts =
4 trade_event
5 |> Map.from_struct()
6
7 struct!([Link], opts)
8 |> [Link]()
9
10 {:noreply, state}
11 end

As we did in the case of the [Link], all incoming messages trigger a handle_info/2 callback
with contents of the message and the current state of the subscriber worker. We just convert that
incoming trade event to a map and then that map to the TradeEvent struct that gets inserted into
the database.
This finishes storing of trade events implementation which we can test by in the interactive shell by
running:

1 $ iex -S mix
2 ...
3 iex(1)> Streamer.start_streaming("XRPUSDT")
4 [Link].147 [info] Starting [Link] worker for XRPUSDT
5 {:ok, #PID<0.395.0>}
6 iex(2)> [Link].start_link("trade_events:XRPUSDT")
7 [Link].204 [info] DataWarehouse worker is subscribing to trade_events:XRPUSDT
8 {:ok, #PID<0.405.0>}

After a couple of minutes we can check the database using psql:


Chapter 14 - Store trade events and orders inside the database 176

1 $ psql -Upostgres -h127.0.0.1


2 Password for user postgres:
3 ...
4 postgres=# \c data_warehouse;
5 You are now connected to database "data_warehouse" as user "postgres".
6 data_warehouse=# \x
7 Expanded display is on.
8 data_warehouse=# SELECT * FROM trade_events;
9 -[ RECORD 1 ]------+-------------------------------------
10 id | f6eae686-946a-4e34-9c33-c7034c2cad5d
11 event_type | trade
12 event_time | 1614041388236
13 symbol | XRPUSDT
14 trade_id | 152765072
15 price | 0.56554000
16 quantity | 1199.10000000
17 buyer_order_id | 1762454848
18 seller_order_id | 1762454775
19 trade_time | 1614041388235
20 buyer_market_maker | f
21 inserted_at | 2021-02-23 [Link]
22 ...

As we can see in the above output, trade events are now getting stored inside the database.

Store orders’ data


In the same fashion as with trade events’ data above, to store orders data we will create an orders
table inside a new migration:

1 $ cd apps/data_warehouse
2 $ mix [Link] create_orders
3 * creating priv/repo/migrations/20210222224522_create_orders.exs

The list of columns for this table will be a copy of [Link]⁴⁸ struct returned from the Binance
exchange:

⁴⁸[Link]
Chapter 14 - Store trade events and orders inside the database 177

1 # /apps/data_warehouse/priv/repo/migrations/20210222224522_create_orders.exs
2 defmodule [Link] do
3 use [Link]
4
5 def change do
6 create table(:orders, primary_key: false) do
7 add(:order_id, :bigint, primary_key: true)
8 add(:client_order_id, :text)
9 add(:symbol, :text)
10 add(:price, :text)
11 add(:original_quantity, :text)
12 add(:executed_quantity, :text)
13 add(:cummulative_quote_quantity, :text)
14 add(:status, :text)
15 add(:time_in_force, :text)
16 add(:type, :text)
17 add(:side, :text)
18 add(:stop_price, :text)
19 add(:iceberg_quantity, :text)
20 add(:time, :bigint)
21 add(:update_time, :bigint)
22
23 timestamps()
24 end
25 end
26 end

We updated all of the shortened names like orig_qty to full names like original_quantity.
Let’s run the migration so it will create a new orders table for us:

1 $ mix [Link]

We can copy the above fields list to create a schema module. First, let’s create a new file called
[Link] inside the apps/data_warehouse/lib/data_warehouse/schema directory:
Chapter 14 - Store trade events and orders inside the database 178

1 # /apps/data_warehouse/lib/data_warehouse/schema/[Link]
2 defmodule [Link] do
3 use [Link]
4
5 @primary_key {:order_id, :integer, autogenerate: false}
6
7 schema "orders" do
8 field(:client_order_id, :string)
9 field(:symbol, :string)
10 field(:price, :string)
11 field(:original_quantity, :string)
12 field(:executed_quantity, :string)
13 field(:cummulative_quote_quantity, :string)
14 field(:status, :string)
15 field(:time_in_force, :string)
16 field(:type, :string)
17 field(:side, :string)
18 field(:stop_price, :string)
19 field(:iceberg_quantity, :string)
20 field(:time, :integer)
21 field(:update_time, :integer)
22
23 timestamps()
24 end
25 end

We can now add a handler to our [Link] that will convert the [Link]
struct to [Link] and store data inside the database:

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 def handle_info(%[Link]{} = order, state) do
3 data =
4 order
5 |> Map.from_struct()
6
7 struct([Link], data)
8 |> [Link](%{
9 original_quantity: order.orig_qty,
10 executed_quantity: order.executed_qty,
11 cummulative_quote_quantity: order.cummulative_quote_qty,
12 iceberg_quantity: order.iceberg_qty
13 })
14 |> [Link](
Chapter 14 - Store trade events and orders inside the database 179

15 on_conflict: :replace_all,
16 conflict_target: :order_id
17 )
18
19 {:noreply, state}
20 end
21 ...

In the above code, we are copying the matching fields using the struct/2 function but all other
fields that aren’t 1 to 1 between two structs won’t be copied, so we need to merge them in the
second step(using the [Link]/2 function). We are also using the on_conflict: :replace_all
option to make the insert/2 function act as it would be upsert/2(to avoid writing separate logic
for inserting and updating the orders).
Having all of this in place we will now be able to store broadcasted orders’ data in the database but
there’s nothing actually broadcasting them.
We need to modify the [Link] module to broadcast the [Link] whenever it places
buy/sell orders or fetches them again:

1 # /apps/naive/lib/naive/[Link]
2 ...
3 # inside placing initial buy order callback
4 {:ok, %[Link]{} = order} =
5 @binance_client.order_limit_buy(symbol, quantity, price, "GTC")
6
7 :ok = broadcast_order(order)
8 ...
9
10 # inside buy order (partially) filled callback
11 {:ok, %[Link]{} = current_buy_order} =
12 @binance_client.get_order(
13 symbol,
14 timestamp,
15 order_id
16 )
17
18 :ok = broadcast_order(current_buy_order)
19 ...
20
21 # inside the same callback in case of buy order filled
22 {:ok, %[Link]{} = order} =
23 @binance_client.order_limit_sell(symbol, quantity, sell_price, "GTC")
24
Chapter 14 - Store trade events and orders inside the database 180

25 :ok = broadcast_order(order)
26 ...
27
28 # inside sell order (partially) filled callback
29 {:ok, %[Link]{} = current_sell_order} =
30 @binance_client.get_order(
31 symbol,
32 timestamp,
33 order_id
34 )
35
36 :ok = broadcast_order(current_sell_order)
37 ...

Above 4 places send both the [Link] and the [Link] structs - our broadcast_-
order/1 function needs to be able to handle them both. Add the following at the bottom of the
[Link] module:

1 # /apps/naive/lib/naive/[Link]
2 defp broadcast_order(%[Link]{} = response) do
3 response
4 |> convert_to_order()
5 |> broadcast_order()
6 end
7
8 defp broadcast_order(%[Link]{} = order) do
9 [Link](
10 [Link],
11 "orders:#{[Link]}",
12 order
13 )
14 end
15
16 defp convert_to_order(%[Link]{} = response) do
17 data =
18 response
19 |> Map.from_struct()
20
21 struct([Link], data)
22 |> [Link](%{
23 cummulative_quote_qty: "0.00000000",
24 stop_price: "0.00000000",
25 iceberg_qty: "0.00000000",
Chapter 14 - Store trade events and orders inside the database 181

26 is_working: true
27 })
28 end

As [Link] process expects only the [Link] structs to be broad-


casted, we first check is it the [Link] struct and convert the passed value to the
[Link] struct (if that’s the case) and only then broadcast it to the PubSub topic.

The converting logic as previously uses the struct/2 function but it also merges in default values
that are missing from the much smaller [Link] struct(with comparison to the
[Link]).

At this moment we will be able to store orders inside the database and we can check that by running:

1 $ iex -S mix
2 ...
3 iex(1)> [Link].start_link("orders:NEOUSDT")
4 [Link].043 [info] DataWarehouse worker is subscribing to orders:XRPUSDT
5 {:ok, #PID<0.400.0>}
6 iex(2)> Naive.start_trading("NEOUSDT") [Link]\
7 39.741 [info] Starting [Link] worker for NEOUSDT
8 [Link].832 [info] Starting new supervision tree to trade on NEOUSDT
9 {:ok, #PID<0.402.0>}
10 [Link].654 [info] Initializing new trader(1614119921653) for NEOUSDT
11 iex(3)> Streamer.start_streaming("NEOUSDT") [Link]\
12 23.786 [info] Starting [Link] worker for NEOUSDT
13 {:ok, #PID<0.412.0>}
14 [Link].187 [info] The trader(1614119921653) is placing a BUY order for NEOUSDT @ \
15 37.549, quantity: 5.326
16 [Link].449 [info] The trader(1614119921653) is placing a SELL order for NEOUSDT @\
17 37.578, quantity: 5.326.

At this moment inside the DataWarehouse’s database we should see orders:

1 $ psql -Upostgres -h127.0.0.1


2 Password for user postgres:
3 ...
4 postgres=# \c data_warehouse;
5 You are now connected to database "data_warehouse" as user "postgres".
6 data_warehouse=# \x
7 Expanded display is on.
8 data_warehouse=# SELECT * FROM orders;
9 -[ RECORD 1 ]--------------+---------------------------------
10 order_id | 1
Chapter 14 - Store trade events and orders inside the database 182

11 client_order_id | C81E728D9D4C2F636F067F89CC14862C
12 symbol | NEOUSDT
13 price | 38.16
14 original_quantity | 5.241
15 executed_quantity | 0.00000000
16 cummulative_quote_quantity | 0.00000000
17 status | FILLED
18 time_in_force | GTC
19 type | LIMIT
20 side | BUY
21 stop_price | 0.00000000
22 iceberg_quantity | 0.00000000
23 time | 1614120906320
24 update_time | 1614120906320
25 inserted_at | 2021-02-23 [Link]
26 updated_at | 2021-02-23 [Link]
27 -[ RECORD 2 ]--------------+---------------------------------
28 order_id | 2
29 client_order_id | ECCBC87E4B5CE2FE28308FD9F2A7BAF3
30 symbol | NEOUSDT
31 price | 38.19
32 original_quantity | 5.241
33 executed_quantity | 0.00000000
34 cummulative_quote_quantity | 0.00000000
35 status | NEW
36 time_in_force | GTC
37 type | LIMIT
38 side | SELL
39 stop_price | 0.00000000
40 iceberg_quantity | 0.00000000
41 time |
42 update_time |
43 inserted_at | 2021-02-23 [Link]
44 updated_at | 2021-02-23 [Link]

The first record above got inserted and updated as its state is “FILLED”, the second one wasn’t
updated yet as it’s still in “NEW” state - that confirms that the upsert trick works.
That finishes the implementation of storing orders inside the database.
Chapter 14 - Store trade events and orders inside the database 183

Implement supervision
Currently, we have a [Link] process that will take care of storing data
into the database, but sadly if anything will go wrong inside our worker and it will crash there’s no
supervision in place to restart it.
The supervision tree for the data_warehouse application will be similar to ones from the naive and
streamer apps but different enough to not use the [Link] abstraction.

For example, it doesn’t use the symbol column, it works based on the topic column. This
would require changes to the [Link]’s functions like update_status/4 or fetch_-
symbols_to_start/2, we could update them to accept column name but that would need to be passed
through other functions. We can see that this is probably not the best approach and the further we
will get the more complex it will become. The second issue would be that we are registering all
processes with names and that can be problematic as the list of processes will start to grow(as we
can imagine in the case of the data_warehouse application).
The better approach would be to mix the DynamicSupervisor⁴⁹ together with Registry⁵⁰.
The DynamicSupervisor will supervise the [Link] and instead of keeping track of them
by registering them using atoms we will start them :via Elixir Registry.
We will add all functionality that we implemented for naive and streamer applications. We will
provide the functions to start and stop storing data on passed PubSub topics as well as store those
topics inside the database so storing will be autostarted.

Create subscriber_settings table


To provide autostarting function we need to create a new migration that will create the subscriber_-
settings table:

1 $ cd apps/data_warehouse
2 $ mix [Link] create_subscriber_settings
3 * creating priv/repo/migrations/20210227230123_create_subscriber_settings.exs

At this moment we can copy the code to create the settings table(enum and index as well) from
the streamer application and tweak it to fit the data_warehouse application. So the first important
change (beside updating namespaces from Streamer to DataWarehouse) will be to make a note that
we have a setting per topic - not per symbol as for the naive and streamer applications:

⁴⁹[Link]
⁵⁰[Link]
Chapter 14 - Store trade events and orders inside the database 184

1 # /apps/data_warehouse/priv/repo/migrations/20210227230123_create_subscriber_setting\
2 [Link]
3 defmodule [Link] do
4 use [Link]
5
6 alias [Link]
7
8 def change do
9 SubscriberStatusEnum.create_type()
10
11 create table(:subscriber_settings, primary_key: false) do
12 add(:id, :uuid, primary_key: true)
13 add(:topic, :text, null: false)
14 add(:status, [Link](), default: "off", null: false)
15
16 timestamps()
17 end
18
19 create(unique_index(:subscriber_settings, [:topic]))
20 end
21 end

Both schema and enum will be almost identical to the ones from the streamer application - we can
simply copy those files and apply basic tweaks like updating the namespace:

1 $ cp apps/streamer/lib/streamer/schema/[Link] apps/data_warehouse/lib/data_ware\
2 house/schema/subscriber_settings.ex
3 $ cp apps/streamer/lib/streamer/schema/streaming_status_enum.ex apps/data_warehouse/\
4 lib/data_warehouse/schema/subscriber_status_enum.ex

Remember about updating the symbol column to topic as well as table name inside the [Link]

1 # /apps/data_warehouse/lib/data_warehouse/schema/subscriber_settings.ex
2 defmodule [Link] do
3 use [Link]
4
5 alias [Link]
6
7 @primary_key {:id, :binary_id, autogenerate: true}
8
9 schema "subscriber_settings" do
10 field(:topic, :string)
Chapter 14 - Store trade events and orders inside the database 185

11 field(:status, SubscriberStatusEnum)
12
13 timestamps()
14 end
15 end

Inside apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.ex we need


to swap references of Streamer to DataWarehouse and references of StreamingStatusEnum to
SubscriberStatusEnum:

1 # /apps/data_warehouse/lib/data_warehouse/schema/subscriber_status_enum.ex
2 import EctoEnum
3
4 defenum([Link], :subscriber_status, [:on, :off])

Don’t forget to run the migration:

1 $ mix [Link]

At this moment we have all pieces in place to execute queries on our new table. In this place, we
can think about the seeding script. For the data_warehouse specifically, we won’t need to provide
that script as we don’t know in advance what topic names we will use. Instead of seeding settings
in advance, our code will “upsert”(using insert function) settings when start_storing/1 or stop_-
storing/1 are called.

Redesign supervision using Registry


We can now focus on drafting a supervision tree for the data_warehouse application. At this moment
we have only the [Link] and the [Link] modules.
As it was with the case of naive and streamer applications, we will need an additional level of
supervision to cater for “autostarting” Task as well as, in the case of the data_warehouse application
the Registry.
The full supervision tree will look as follows:
Chapter 14 - Store trade events and orders inside the database 186

Supervision tree with Registry

Everything looks very similar to the supervision tree that we created in the streamer and the naive
applications but there’s an additional Registry that is supervised by the SubscriberSupervisior
process.
The idea is that inside the Worker module’s start_link/1 we will register worker processes using
:via⁵¹ tuple. Internally, GenServer will utilize Registry’s functions like register_name/2 to add
process to the registry under the topic string. This way we will be able to retrieve pids assigned
to topics using those topic strings instead of registering each worker process with an atom name.
Just as previously the DynamicSupervisor will be in charge of the supervising the Worker processes
and it won’t be even aware that we are using the Registry to keep track of topic => pid association.

Create the [Link] module


Let’s start by creating a new file called dynamic_supervisor.ex inside the apps/data_warehouse/lib/data_-
warehouse/subscriber directory and put default dynamic supervisor implementation inside:

⁵¹[Link]
Chapter 14 - Store trade events and orders inside the database 187

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 defmodule [Link] do
3 use DynamicSupervisor
4
5 def start_link(_arg) do
6 DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
7 end
8
9 def init(_arg) do
10 [Link](strategy: :one_for_one)
11 end
12 end

As we will put all our logic related to autostarting, starting and stopping inside this module we can
already add aliases, import and require:

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 require Logger
3
4 alias [Link]
5 alias [Link]
6 alias [Link]
7
8 import [Link], only: [from: 2]
9
10 @registry :subscriber_workers

Additionally, we added the @registry module attribute that we will use to retrieve pid for the specific
topic.
We can move on to implementing autostart_workers/0 which will look very similar to the ones
that we implemented in the streamer and the naive applications:

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 ...
3 def autostart_workers() do
4 [Link](
5 from(s in SubscriberSettings,
6 where: [Link] == "on",
7 select: [Link]
8 )
9 )
10 |> [Link](&start_child/1)
Chapter 14 - Store trade events and orders inside the database 188

11 end
12
13 defp start_child(args) do
14 DynamicSupervisor.start_child(
15 __MODULE__,
16 {Worker, args}
17 )
18 end

We can see that we are querying the database for list of topics(not symbols) and we are calling
start_child/2 for each results.

The start_worker/1 is where the Registry will shine as we won’t need to check is there already a
process running for that topic - we can leave that check to the Registry. If there’s process already
running for that topic it will just return a tuple starting with :error atom:

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 ...
3 def start_worker(topic) do
4 [Link]("Starting storing data from #{topic} topic")
5 update_status(topic, "on")
6 start_child(topic)
7 end
8 ...
9 defp update_status(topic, status)
10 when is_binary(topic) and is_binary(status) do
11 %SubscriberSettings{
12 topic: topic,
13 status: status
14 }
15 |> [Link](
16 on_conflict: :replace_all,
17 conflict_target: :topic
18 )
19 end

As we are not seeding the database with the default settings we will use the insert/2 function with
options(as previously) to make it work as it would be an “upsert” function.
Last function in this module will be stop_worker/1 which uses private stop_child/1 function. The
stop_child/1 function shows how to retrieve pid of the process assigned to the passed topic:
Chapter 14 - Store trade events and orders inside the database 189

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/dynamic_supervisor.ex
2 ...
3 def stop_worker(topic) do
4 [Link]("Stopping storing data from #{topic} topic")
5 update_status(topic, "off")
6 stop_child(topic)
7 end
8 ...
9 defp stop_child(args) do
10 case [Link](@registry, args) do
11 [{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
12 _ -> [Link]("Unable to locate process assigned to #{inspect(args)}")
13 end
14 end

That is a full implementation of the [Link] module and


it’s almost as slim as one from the last chapter where we leveraged macros to achieve that lightness.
Using the Registry is the preferred way to manage a list of identifiable processes. We won’t run
into an issue of overusing the atoms(as they are not garbage collected, we could hit that limit sooner
or later).

Register Worker processes using :via


The above DynamicSupervisor module assumes that Workers are registered inside the Registry - to
make this happen we will need to update the start_link/1 function of the [Link]
module:

1 # /apps/data_warehouse/lib/data_warehouse/subscriber/[Link]
2 ...
3 def start_link(topic) do
4 GenServer.start_link(
5 __MODULE__,
6 topic,
7 name: via_tuple(topic)
8 )
9 end
10 ...
11 defp via_tuple(topic) do
12 {:via, Registry, {:subscriber_workers, topic}}
13 end
14 ...

Passing the :name option to the GenServer’s start_link/3 function we instruct it to utilize the
Registry module to register processes under topic names.
Chapter 14 - Store trade events and orders inside the database 190

Create a new supervision level for Registry, Task and the


DynamicSupervisor
We have the lowest level modules - the Worker and the DynamicSupervisor implemented - time
to add a new Supervisor that will start the Registry, the DynamicSupervisor and the autostart
storing Task. First create a new file called subscriber_supervisor.ex inside the apps/data_-
warehouse/lib/data_warehouse directory:

1 # /apps/data_warehouse/lib/data_warehouse/subscriber_supervisor.ex
2 defmodule [Link] do
3 use Supervisor
4
5 alias [Link]
6
7 @registry :subscriber_workers
8
9 def start_link(_args) do
10 Supervisor.start_link(__MODULE__, [], name: __MODULE__)
11 end
12
13 def init(_args) do
14 children = [
15 {Registry, [keys: :unique, name: @registry]},
16 {DynamicSupervisor, []},
17 {Task,
18 fn ->
19 DynamicSupervisor.autostart_workers()
20 end}
21 ]
22
23 [Link](children, strategy: :rest_for_one)
24 end
25 end

Important part here will be to match the Registry name to the one defined inside the DynamicSupervisor
and the Worker modules.

Link the SubscriberSupervisor to the Application


We need to update the [Link] module to start our new [Link]
process as well as register itself under name matching to its module(just for consistency with other
applications):
Chapter 14 - Store trade events and orders inside the database 191

1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 def start(_type, _args) do
4 children = [
5 {[Link], []},
6 {[Link], []} # <= new module added
7 ]
8
9 # See [Link]
10 # for other strategies and supported options
11 opts = [strategy: :one_for_one, name: [Link]] # <= name updat\
12 ed
13 Supervisor.start_link(children, opts)
14 end
15 ...

Add interface
The final step will be to add an interface to the DataWarehouse application to start and stop storing:

1 # /apps/data_warehouse/lib/data_warehouse.ex
2 alias [Link]
3
4 def start_storing(stream, symbol) do
5 DynamicSupervisor.start_worker("#{[Link](stream)}:#{[Link](symbo\
6 l)}")
7 end
8
9 def stop_storing(stream, symbol) do
10 DynamicSupervisor.stop_worker("#{[Link](stream)}:#{[Link](symbol\
11 )}")
12 end

Inside the above functions, we are just doing a couple of sanity checks on the case of the passed
arguments assuming that topics follow the rule of downcased stream + : + upcased symbol.

Test
The interface above was the last step in our implementation, we can now test that all works as
expected:
Chapter 14 - Store trade events and orders inside the database 192

1 $ iex -S mix
2 ...
3 iex(1)> DataWarehouse.start_storing("trade_events", "NEOUSDT")
4 [Link].740 [info] Starting storing data from trade_events:NEOUSDT topic
5 [Link].847 [info] DataWarehouse worker is subscribing to trade_events:NEOUSDT
6 {:ok, #PID<0.429.0>}
7 iex(2)> DataWarehouse.start_storing("trade_events", "NEOUSDT")
8 [Link].753 [info] Starting storing data from trade_events:NEOUSDT topic
9 {:error, {:already_started, #PID<0.459.0>}}
10 iex(3)> DataWarehouse.start_storing("orders", "NEOUSDT")
11 [Link].386 [info] Starting storing data from orders:NEOUSDT topic
12 [Link].403 [info] DataWarehouse worker is subscribing to orders:NEOUSDT
13 {:ok, #PID<0.431.0>}
14 BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
15 (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
16 ^C%
17 $ iex -S mix
18 ...
19 [Link].058 [info] DataWarehouse worker is subscribing to trade_events:NEOUSDT
20 [Link].062 [info] DataWarehouse worker is subscribing to orders:NEOUSDT
21 # autostart works ^^^
22 iex(1)> Naive.start_trading("NEOUSDT")
23 [Link].316 [info] Starting [Link] worker for NEOUSDT
24 [Link].417 [info] Starting new supervision tree to trade on NEOUSDT
25 {:ok, #PID<0.419.0>}
26 iex(3)>
27 [Link].484 [info] Initializing new trader(1615221407466) for NEOUSDT
28 iex(2)> Streamer.start_streaming("NEOUSDT")
29 [Link].660 [info] Starting [Link] worker for NEOUSDT
30 {:ok, #PID<0.428.0>}
31 ...
32 iex(3)> DataWarehouse.stop_storing("trade_events", "NEOUSDT")
33 [Link].398 [info] Stopping storing data from trade_events:NEOUSDT topic
34 :ok
35 iex(4)> DataWarehouse.stop_storing("trade_events", "NEOUSDT")
36 [Link].151 [info] Stopping storing data from trade_events:NEOUSDT topic
37 [Link].160 [warn] Unable to locate process assigned to "trade_events:NEOUSDT"
38 :ok
39 iex(5)> [{pid, nil}] = [Link](:subscriber_workers, "orders:NEOUSDT")
40 [{#PID<0.417.0>, nil}]
41 iex(6)> [Link](pid, :crash)
42 true
43 [Link].812 [info] DataWarehouse worker is subscribing to orders:NEOUSDT
Chapter 14 - Store trade events and orders inside the database 193

As we can see even this simple implementation handles starting, autostarting and stopping. It also
gracefully handles starting workers when one is already running as well as stopping when there
none running.
As a challenge, you could update the naive and the streamer application to use Registry and remove
[Link] module as it was superseded by the above solution.

[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁵²
⁵²[Link]
Chapter 15 - Backtest trading strategy
Objectives
• overview of requirements
• implement the storing task
• test the backtesting

Overview of requirements
In the last chapter, we started storing trade events and orders in the database which will be crucial
for backtesting, which we will focus on in this chapter.
Backtesting is a procedure of running historical data through the system and observing how our
strategy would perform as if we would run it “in the past”. Backtesting works on assumption that
the market will behave in a similar fashion in the future as it was in the past.
At this moment we are receiving the trade events from the Binance through WebSocket. The
[Link] process is handling those messages by parsing them from JSON string to map,
then converting them to structs and broadcasting them to the trade_events:#{symbol} PubSub topic.
The [Link] subscribes to the trade_events:#{symbol} topic and takes decisions based on
incoming data. As it places buy and sell orders it broadcasts them to the orders:#{symbol} PubSub
topic. The [Link] processes subscribe to both trade events and orders
topics and store incoming data inside the database - we can visualize that flow like that:
Chapter 15 - Backtest trading strategy 195

Normal broadcast/subscribe flow

To backtest we can substitute the [Link] process with a Task that will stream trade
events’ data from the database and broadcasts it to the trade_events:#{symbol} PubSub topic(the
same topic as the [Link] process).
From the perspective of the [Link] it does not make any difference who is broadcasting
those trade events. This should be a clear indication of the value of publish/subscribe model that we
Chapter 15 - Backtest trading strategy 196

implemented from the beginning. It allows us to swap producer and consumers freely to backtest
our trading strategies:

Task feeds the stream of data from the database straight to the PubSub topic
Chapter 15 - Backtest trading strategy 197

Implement the storing task


We will start by creating a new file called [Link] inside the apps/data_warehouse/lib/data_-
warehouse directory. We will start by implementing the basic Task behavior:

1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 defmodule [Link] do
3 use Task
4
5 def start_link(arg) do
6 Task.start_link(__MODULE__, :run, [arg])
7 end
8
9 def run(arg) do
10 # ...
11 end
12 end

To be able to query the database we will import Ecto and require Logger for logging:

1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 import [Link], only: [from: 2]
4
5 require Logger
6 ...

We can now modify the run/1 function to expect specific type, symbol, from, to and interval:

1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 def run(%{
4 type: :trade_events,
5 symbol: symbol,
6 from: from,
7 to: to,
8 interval: interval
9 }) do
10 ...

Inside the body of the run/1 function, the first we will convert from and to Unix timestamps by
using private helper functions:
Chapter 15 - Backtest trading strategy 198

1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 def run(%{
4 ...
5 }) do
6 from_ts =
7 "#{from}T[Link].000Z"
8 |> convert_to_ms()
9
10 to_ts =
11 "#{to}T[Link].000Z"
12 |> convert_to_ms()
13 end
14 ...
15 defp convert_to_ms(iso8601DateString) do
16 iso8601DateString
17 |> NaiveDateTime.from_iso8601!()
18 |> DateTime.from_naive!("Etc/UTC")
19 |> DateTime.to_unix()
20 |> Kernel.*(1000)
21 end

Next, we will select data from the database but because of possibly hundreds of thousands of rows
being selected and because we are broadcasting them to the PubSub every x ms it could take a
substantial amount of time to broadcast all of them. Instead of selecting data and storing all of it
in the memory, we will use [Link]/1 function to keep broadcasting it on the go. Additionally,
we will add index to the data to be able to log info messages every 10k messages. The last thing that
we need to define will be the timeout value - the default value is 5 seconds and we will change it to
:infinity:

1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 def run(%{
3 ...
4 }) do
5 ...
6
7 [Link](
8 fn ->
9 from(te in [Link],
10 where:
11 [Link] == ^symbol and
12 te.trade_time >= ^from_ts and
13 te.trade_time < ^to_ts,
Chapter 15 - Backtest trading strategy 199

14 order_by: te.trade_time
15 )
16 |> [Link]()
17 |> Enum.with_index()
18 |> [Link](fn {row, index} ->
19 :[Link](interval)
20
21 if rem(index, 10_000) == 0 do
22 [Link]("Publisher broadcasted #{index} events")
23 end
24
25 publishTradeEvent(row)
26 end)
27 end,
28 timeout: :infinity
29 )
30
31 [Link]("Publisher finished streaming trade events")
32 end

Finally, the above code uses the publishTradeEvent/1 helper function which converts DataWare-
house’s TradeEvent to the Stremer’s TradeEvent to broadcast the same structs as the streamer
application:

1 # /apps/data_warehouse/lib/data_warehouse/[Link]
2 ...
3 defp publishTradeEvent(%[Link]{} = trade_event) do
4 new_trade_event =
5 struct(
6 [Link],
7 trade_event |> Map.to_list()
8 )
9
10 [Link](
11 [Link],
12 "trade_events:#{trade_event.symbol}",
13 new_trade_event
14 )
15 end

This finishes our implementation - we should be able to stream trade events from the database to
the PubSub using the above Task which we will do below.
Chapter 15 - Backtest trading strategy 200

Test the backtesting


For consistency and ease of testing/use, I prepared an compressed single data of trade events for
XRPUSDT(2019-06-03). We can download that file from GitHub using wget:

1 $ cd /tmp
2 $ wget [Link]
3 [Link]

We can now uncompress the archive and load those trade events into our database:

1 $ gunzip [Link]
2 $ PGPASSWORD=postgres psql -Upostgres -h localhost -ddata_warehouse -c "\COPY trade\
3 _events FROM '/tmp/[Link]' WITH (FORMAT csv, delimiter ';');"
4 COPY 206115

The number after the word COPY in the response indicates the number of rows that got copied into
the database.
We can now give it a try and run full backtesting but first let’s clean the orders table:

1 $ psql -Upostgres -h127.0.0.1


2 Password for user postgres:
3 ...
4 postgres=# \c data_warehouse
5 You are now connected to database "data_warehouse" as user "postgres".
6 data_warehouse=# DELETE FROM orders;
7 DELETE ...

We can now start a new iex session where we will start trading(the naive application) as well as
storing orders(the data_warehouse application) and instead of starting the [Link] worker
we will start the [Link] task with arguments matching the imported day and
symbol:
Chapter 15 - Backtest trading strategy 201

1 $ iex -S mix
2 ...
3 iex(1)> DataWarehouse.start_storing("orders", "XRPUSDT")
4 [Link].596 [info] Starting storing data from orders:XRPUSDT topic
5 [Link].632 [info] DataWarehouse worker is subscribing to orders:XRPUSDT
6 {:ok, #PID<0.417.0>}
7 iex(2)> Naive.start_trading("XRPUSDT")
8 [Link].293 [info] Starting [Link] worker for XRPUSDT
9 [Link].332 [info] Starting new supervision tree to trade on XRPUSDT
10 {:ok, #PID<0.419.0>}
11 [Link].327 [info] Initializing new trader(1615288698325) for XRPUSDT
12 iex(3)> [Link].start_link(%{type: :trade_events, symbol: "XRPUSDT"\
13 , from: "2019-06-02", to: "2019-06-04", interval: 5})
14 {:ok, #PID<0.428.0>}
15 [Link].532 [info] Publisher broadcasted 0 events
16 [Link].534 [info] The trader(1615288698325) is placing a BUY order for XRPUSDT @ \
17 0.44391, quantity: 450.5
18 [Link].749 [info] The trader(1615288698325) is placing a SELL order for XRPUSDT @\
19 0.44426, quantity: 450.5.
20 ...
21 [Link].568 [info] Publisher broadcasted 10000 events
22 ...
23 [Link].571 [info] Publisher broadcasted 20000 events
24 [Link].576 [info] Publisher broadcasted 30000 events
25 ...
26 [Link].875 [info] Publisher broadcasted 200000 events
27 [Link].576 [info] Publisher finished streaming trade events

From the above log, we can see that it took about 20 minutes to run 206k records through the system(a
lot of that time[17+ minutes] was indeed the 5ms sleep).
After the streaming finished we can check out the orders table inside the database to figure out how
many trades we made and what income have they generated.

1 $ psql -Upostgres -h127.0.0.1


2 Password for user postgres:
3 ...
4 postgres=# \c data_warehouse
5 You are now connected to database "data_warehouse" as user "postgres".
6 data_warehouse=# SELECT COUNT(*) FROM orders;
7 count
8 -------
9 224
10 (1 row)
Chapter 15 - Backtest trading strategy 202

By looking at the orders we can figure out some performance metrics but that’s less than perfect to
get answers to simple questions like “what’s the performance of my strategy?”. We will address that
and other concerns in future chapters.
[Note] Please remember to run mix format to keep things nice and tidy.
Source code for this chapter can be found at Github⁵³

⁵³[Link]

You might also like