API
API
import logging
import asyncio
from datetime import datetime
from . import expiration
from . import global_value
from .api import QuotexAPI
from .utils.services import truncate
from .utils.processor import (
calculate_candles,
process_candles_v2,
merge_candles,
process_tick
)
from .config import (
load_session,
update_session,
resource_path,
credentials
)
from .utils.indicators import TechnicalIndicators
logger = logging.getLogger(__name__)
class Quotex:
def __init__(
self,
email=None,
password=None,
lang="pt",
user_agent="Quotex/1.0",
root_path=".",
user_data_dir="browser",
asset_default="EURUSD",
period_default=60
):
self.size = [
1,
5,
10,
15,
30,
60,
120,
300,
600,
900,
1800,
3600,
7200,
14400,
86400,
]
self.email = email
self.password = password
self.lang = lang
self.resource_path = root_path
self.user_data_dir = user_data_dir
self.asset_default = asset_default
self.period_default = period_default
self.subscribe_candle = []
self.subscribe_candle_all_size = []
self.subscribe_mood = []
self.account_is_demo = 1
self.suspend = 0.2
self.codes_asset = {}
self.api = None
self.duration = None
self.websocket_client = None
self.websocket_thread = None
self.debug_ws_enable = False
self.resource_path = resource_path(root_path)
session = load_session(user_agent)
self.session_data = session
if not email or not password:
self.email, self.password = credentials()
@property
def websocket(self):
"""Property to get websocket.
:returns: The instance of :class:`WebSocket <websocket.WebSocket>`.
"""
return self.websocket_client.wss
@staticmethod
async def check_connect():
await asyncio.sleep(2)
if global_value.check_accepted_connection == 1:
return True
return False
def set_session(self, user_agent: str, cookies: str = None, ssid: str = None):
session = {
"cookies": cookies,
"token": ssid,
"user_agent": user_agent
}
self.session_data = update_session(session)
def get_all_asset_name(self):
if self.api.instruments:
return [[i[1], i[2].replace("\n", "")] for i in self.api.instruments]
return self.codes_asset
return candles
Args:
asset (str): Asset name.
period (int): Period for fetching candles.
Returns:
list: List of prepared candles data.
"""
candles_data = calculate_candles(self.api.candles.candles_data, period)
candles_v2_data = process_candles_v2(self.api.candle_v2_data, asset,
candles_data)
new_candles = merge_candles(candles_v2_data)
return new_candles
if not self.session_data.get("token"):
await self.api.authenticate()
check, reason = await self.api.connect(self.account_is_demo)
Args:
asset (str): Nombre del activo (ej: "EURUSD")
indicator (str): Nombre del indicador
params (dict): Parámetros específicos del indicador
history_size (int): Tamaño del histórico en segundos
timeframe (int): Temporalidad en segundos. Valores posibles:
- 60: 1 minuto
- 300: 5 minutos
- 900: 15 minutos
- 1800: 30 minutos
- 3600: 1 hora
- 7200: 2 horas
- 14400: 4 horas
- 86400: 1 día
"""
# Validar timeframe
valid_timeframes = [60, 300, 900, 1800, 3600, 7200, 14400, 86400]
if timeframe not in valid_timeframes:
return {"error": f"Timeframe no válido. Valores permitidos:
{valid_timeframes}"}
if not candles:
return {"error": f"No hay datos disponibles para el activo {asset}"}
indicators = TechnicalIndicators()
indicator = indicator.upper()
try:
# RSI
if indicator == "RSI":
period = params.get("period", 14)
values = indicators.calculate_rsi(prices, period)
return {
"rsi": values,
"current": values[-1] if values else None,
"history_size": len(values),
"timeframe": timeframe,
"timestamps": timestamps[-len(values):] if values else []
}
# MACD
elif indicator == "MACD":
fast_period = params.get("fast_period", 12)
slow_period = params.get("slow_period", 26)
signal_period = params.get("signal_period", 9)
macd_data = indicators.calculate_macd(prices, fast_period,
slow_period, signal_period)
macd_data["timeframe"] = timeframe
macd_data["timestamps"] = timestamps[-len(macd_data["macd"]):] if
macd_data["macd"] else []
return macd_data
# SMA
elif indicator == "SMA":
period = params.get("period", 20)
values = indicators.calculate_sma(prices, period)
return {
"sma": values,
"current": values[-1] if values else None,
"history_size": len(values),
"timeframe": timeframe,
"timestamps": timestamps[-len(values):] if values else []
}
# EMA
elif indicator == "EMA":
period = params.get("period", 20)
values = indicators.calculate_ema(prices, period)
return {
"ema": values,
"current": values[-1] if values else None,
"history_size": len(values),
"timeframe": timeframe,
"timestamps": timestamps[-len(values):] if values else []
}
# BOLLINGER
elif indicator == "BOLLINGER":
period = params.get("period", 20)
num_std = params.get("std", 2)
bb_data = indicators.calculate_bollinger_bands(prices, period,
num_std)
bb_data["timeframe"] = timeframe
bb_data["timestamps"] = timestamps[-len(bb_data["middle"]):] if
bb_data["middle"] else []
return bb_data
# STOCHASTIC
elif indicator == "STOCHASTIC":
k_period = params.get("k_period", 14)
d_period = params.get("d_period", 3)
stoch_data = indicators.calculate_stochastic(prices, highs, lows,
k_period, d_period)
stoch_data["timeframe"] = timeframe
stoch_data["timestamps"] = timestamps[-len(stoch_data["k"]):] if
stoch_data["k"] else []
return stoch_data
# ATR
elif indicator == "ATR":
period = params.get("period", 14)
values = indicators.calculate_atr(highs, lows, prices, period)
return {
"atr": values,
"current": values[-1] if values else None,
"history_size": len(values),
"timeframe": timeframe,
"timestamps": timestamps[-len(values):] if values else []
}
# ADX
elif indicator == "ADX":
period = params.get("period", 14)
adx_data = indicators.calculate_adx(highs, lows, prices, period)
adx_data["timeframe"] = timeframe
adx_data["timestamps"] = timestamps[-len(adx_data["adx"]):] if
adx_data["adx"] else []
return adx_data
# ICHIMOKU
elif indicator == "ICHIMOKU":
tenkan_period = params.get("tenkan_period", 9)
kijun_period = params.get("kijun_period", 26)
senkou_b_period = params.get("senkou_b_period", 52)
ichimoku_data = indicators.calculate_ichimoku(highs, lows,
tenkan_period, kijun_period, senkou_b_period)
ichimoku_data["timeframe"] = timeframe
ichimoku_data["timestamps"] = timestamps[-
len(ichimoku_data["tenkan"]):] if ichimoku_data[
"tenkan"] else []
return ichimoku_data
else:
return {"error": f"Indicador '{indicator}' no soportado"}
except Exception as e:
return {"error": f"Error calculando el indicador: {str(e)}"}
Args:
asset (str): Nombre del activo
indicator (str): Nombre del indicador
params (dict): Parámetros del indicador
callback (callable): Función que se llamará con cada actualización
timeframe (int): Temporalidad en segundos
"""
if not callback:
raise ValueError("Debe proporcionar una función callback")
# Validar timeframe
valid_timeframes = [60, 300, 900, 1800, 3600, 7200, 14400, 86400]
if timeframe not in valid_timeframes:
raise ValueError(f"Timeframe no válido. Valores permitidos:
{valid_timeframes}")
try:
# Iniciar stream de velas
self.start_candles_stream(asset, timeframe)
while True:
try:
# Obtener velas en tiempo real
real_time_candles = await self.get_realtime_candles(asset,
timeframe)
if real_time_candles:
# Convertir el diccionario a lista ordenada por tiempo
candles_list = sorted(real_time_candles.items(), key=lambda
x: x[0])
indicators = TechnicalIndicators()
indicator = indicator.upper()
if indicator == "RSI":
period = params.get("period", 14)
values = indicators.calculate_rsi(prices, period)
result["value"] = values[-1] if values else None
result["all_values"] = values
result["indicator"] = "RSI"
# BOLLINGER
elif indicator == "BOLLINGER":
period = params.get("period", 20)
num_std = params.get("std", 2)
bb_data = indicators.calculate_bollinger_bands(prices,
period, num_std)
result["value"] = bb_data["current"]
result["all_values"] = bb_data
# STOCHASTIC
elif indicator == "STOCHASTIC":
k_period = params.get("k_period", 14)
d_period = params.get("d_period", 3)
stoch_data = indicators.calculate_stochastic(prices,
highs, lows, k_period, d_period)
result["value"] = stoch_data["current"]
result["all_values"] = stoch_data
# ADX
elif indicator == "ADX":
period = params.get("period", 14)
adx_data = indicators.calculate_adx(highs, lows,
prices, period)
result["value"] = adx_data["current"]
result["all_values"] = adx_data
# ATR
elif indicator == "ATR":
period = params.get("period", 14)
values = indicators.calculate_atr(highs, lows, prices,
period)
result["value"] = values[-1] if values else None
result["all_values"] = values
# ICHIMOKU
elif indicator == "ICHIMOKU":
tenkan_period = params.get("tenkan_period", 9)
kijun_period = params.get("kijun_period", 26)
senkou_b_period = params.get("senkou_b_period", 52)
ichimoku_data = indicators.calculate_ichimoku(highs,
lows, tenkan_period, kijun_period,
senkou_b_period)
result["value"] = ichimoku_data["current"]
result["all_values"] = ichimoku_data
else:
result["error"] = f"Indicador '{indicator}' no
soportado para tiempo real"
except Exception as e:
print(f"Error en la suscripción: {str(e)}")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error en la suscripción: {str(e)}")
finally:
# Limpiar suscripciones al salir
try:
self.stop_candles_stream(asset)
except:
pass
Returns:
The trading history from the API.
"""
account_type = "demo" if self.account_is_demo else "live"
return await self.api.get_trader_history(account_type, page_number=1)
async def buy(self, amount: float, asset: str, direction: str, duration: int,
time_mode: str = "TIME"):
"""
Buy Binary option
Args:
amount (float): Amount to buy.
asset (str): Asset to buy.
direction (str): Direction to buy.
duration (int): Duration to buy.
time_mode (str): Time mode to buy.
Returns:
The buy result.
"""
self.api.buy_id = None
request_id = expiration.get_timestamp()
is_fast_option = time_mode.upper() == "TIME"
self.start_candles_stream(asset, duration)
self.api.buy(amount, asset, direction, duration, request_id,
is_fast_option)
count = 0.1
while self.api.buy_id is None:
count += 0.1
if count > duration:
status_buy = False
break
await asyncio.sleep(0.2)
if global_value.check_websocket_if_error:
return False, global_value.websocket_error_reason
else:
status_buy = True
def get_payment(self):
"""Payment Quotex server"""
assets_data = {}
for i in self.api.instruments:
assets_data[i[2].replace("\n", "")] = {
"turbo_payment": i[18],
"payment": i[5],
"profit": {
"1M": i[-9],
"5M": i[-8]
},
"open": i[14]
}
return assets_data
data = assets_data.get(asset_name)
if timeframe == "all":
return data.get("profit")
return data.get("profit").get(f"{timeframe}M")
Args:
asset (str): The asset to stream data for.
period (int, optional): The period for the candles. Defaults to 0.
"""
self.api.current_asset = asset
self.api.subscribe_realtime_candle(asset, period)
self.api.chart_notification(asset)
self.api.follow_candle(asset)
This function sets up trading parameters for the specified asset, including
the period,
deal amount, and percentage mode if applicable. It then waits for the
updated investment
settings to be available and returns them.
Args:
asset (str): The asset for which to apply the settings.
period (int, optional): The trading period in seconds. Defaults to 0.
time_mode (bool, optional): Whether to switch time mode. Defaults to
False.
deal (float, optional): The fixed amount for each deal. Defaults to 5.
percent_mode (bool, optional): Whether to enable percentage-based
deals. Defaults to False.
percent_deal (float, optional): The percentage value for percentage-
based deals. Defaults to 1.
Returns:
dict: The updated investment settings for the specified asset.
Raises:
ValueError: If the investment settings cannot be retrieved after
multiple attempts.
Notes:
- This function continuously refreshes the settings until they are
available.
- A sleep interval is used to prevent excessive API calls.
"""
is_fast_option = False if time_mode.upper() == "TIMER" else True
self.api.current_asset = asset
self.api.settings_apply(
asset,
period,
is_fast_option=is_fast_option,
deal=deal,
percent_mode=percent_mode,
percent_deal=percent_deal
)
await asyncio.sleep(0.2)
while True:
self.api.refresh_settings()
if self.api.settings_list:
investments_settings = self.api.settings_list
break
await asyncio.sleep(0.2)
return investments_settings
def start_signals_data(self):
self.api.signals_subscribe()
Args:
asset (str): The asset to get candle data for.
period (int, optional): The period for the candles. Defaults to 0.
Returns:
dict: A dictionary of real-time candle data.
"""
data = {}
self.start_candles_stream(asset, period)
while True:
if self.api.realtime_price.get(asset):
tick = self.api.realtime_candles
return process_tick(tick, period, data)
await asyncio.sleep(0.1)
def get_signal_data(self):
return self.api.signal_data
def get_profit(self):
return self.api.profit_in_operation or 0
Args:
operation_id (str): The ID of the trade to check.
Returns:
str: win if the trade is a win, loss otherwise.
float: The profit from operations; returns 0 if no profit is recorded.
"""
data_history = await self.get_history()
for item in data_history:
if item.get("ticket") == operation_id:
profit = float(item.get("profitAmount", 0))
status = "win" if profit > 0 else "loss"
return status, item