14
0
0
Скопировать ссылку
Telegram
WhatsApp
Vkontakte
Одноклассники
Назад

Как я научил торгового бота рисовать свечные графики и перестал спамить текстом

Время чтения 91 минута
Нет времени читать?
Скопировать ссылку
Telegram
WhatsApp
Vkontakte
Одноклассники
14
0
0
Нет времени читать?
Скопировать ссылку
Telegram
WhatsApp
Vkontakte
Одноклассники

Всем привет! Меня зовут Николай Пискунов, я руководитель направления Big Data и эксперт курса Cloud DevSecOps по безопасной разработке от Академии вАЙТИ. Сегодня расскажу о разработке системы, которая строит свечные графики для трейдинг-бота на Python. Это полноценный инструмент анализа, который помогает принимать торговые решения в реальном времени. Важная часть этой системы — быстрая связь с пользователем через бота в Телеграме. 

Когда бот начинает генерировать десятки сигналов в день, становится очевидно, что текстовые уведомления в Телеграме типа *«EUR/USD OTC, BUY, цена 1.08542» — это мусор. Трейдеру нужно видеть контекст: что происходило на графике перед сигналом, какой паттерн сформировался, где находится уровень входа.

В этой статье я подробно, с кусками реального рабочего кода, расскажу, как построил систему визуализации свечных графиков. Она сама собирает тики, агрегирует их в свечи, хранит историю, а в нужный момент генерирует картинку и отправляет ее в Телеграм. И всё это — в реальном времени и с минимальными задержками.

Как я научил торгового бота рисовать свечные графики и перестал спамить текстом

Проблема и решение: как превратить текстовый спам в полезное сообщение 

Исходная система была простой: WebSocket-клиент получал тики, детектор паттернов находил «поглощение» или «стохастик» и отправлял сообщение. Вот как это выглядело:

📈 СИГНАЛ: EUR/USD OTC (15s) – BUY

Полезной информации — ноль. Пользователь не видел:

  • графика перед сигналом (был ли тренд?);
  • формы свечи, которая дала сигнал;
  • уровня входа относительно ближайших максимумов/минимумов.

Нужно было сделать так, чтобы бот:

  1. Автоматически рисовал график при обнаружении сигнала.
  2. Делал это быстро (менее 300 мс), чтобы не тормозить основной поток анализа.
  3. Отправлял график в Телеграм и сразу удалял его с сервера, чтобы не забивать место.
  4. Работал для любых таймфреймов — от 15 секунд до 1 дня.

Звучит как отдельный микросервис, но на деле всё решается грамотной архитектурой в рамках одного приложения. Поехали!

Шаг 1. Архитектурная особенность: два кеша, чтобы не смешивать анализ и визуализацию

Первая и самая важная архитектурная ошибка, которую я совершил в первой версии и хотел исправить в этой, — хранить все свечи в одном месте. Анализу нужны только свежие данные (последние 20–50 свечей), а для графика — больше истории (до 500 свечей), и она может быть старой.

Смешивать их — значит, засорять память и замедлять анализ. Решение: два отдельных кеша внутри главного класса монитора.


python
 
class AdvancedMonitor:
def __init__(self, config: Dict):
     # ... инициализация других компонентов ...
     # Кеш для АНАЛИЗА: только свежие свечи. Храним по правилам из конфига.
     self.analysis_candles = {}

     # Кеш для ГРАФИКОВ: максимум истории. Храним до 500 свечей.
     self.chart_candles = {}
     self.max_chart_candles = 500
 
async def _on_tik_received(self, tik: Dict):
     # ... обработка тика и создание свечи (aggregated_candle) ...
     # Дальше — магия разделения
     key = f"{pair}_{timeframe}"

     # 1. Сохраняем ВСЁ в кеш для графиков
     if key not in self.chart_candles:
         self.chart_candles[key] = []
     self.chart_candles[key].append(aggregated_candle)

     # Ограничиваем длину для экономии памяти
     if len(self.chart_candles[key]) > self.max_chart_candles:
         self.chart_candles[key] = self.chart_candles[key][-self.max_chart_candles:]

      # 2. Сохраняем только СВЕЖЕЕ в кеш для анализа
     current_time = datetime.now().timestamp()
     candle_age = current_time - aggregated_candle.get('timestamp', 0)
     timeframe_max_age = self.get_max_candle_age_for_timeframe(timeframe) # 45 сек. для 15s
 
     if candle_age <= timeframe_max_age:
         if key not in self.analysis_candles:
             self.analysis_candles[key] = []
         self.analysis_candles[key].append(aggregated_candle)

         # Ограничиваем длину для анализа (из конфига)
         max_store = self.get_max_candles_to_store(timeframe) # 400 для 15s
         if len(self.analysis_candles[key]) > max_store:
             self.analysis_candles[key] = self.analysis_candles[key][-max_store:]

 

Зачем так сложно?

Анализ теперь работает с небольшим, очень релевантным набором данных, что повышает скорость проверки паттернов.

Визуализация имеет доступ к более глубокой истории, чтобы нарисовать красивый график с контекстом. Они не мешают друг другу.

Шаг 2. Агрегатор в реальном времени переводит тики в свечи

На вход мы получаем от WebSocket API поток сырых тиков (цен). Нам нужно самим собирать из них свечи. Для этого я написал класс RealTimeCandleAggregator. Он получает на вход каждый тик, определяет, к какой свече по времени он относится, и обновляет ее OHLCV (Open, High, Low, Close, Volume).

Важный нюанс: таймфрейм может быть секундным (15s, 30s), поэтому нужно аккуратно округлять время.


python
class RealTimeCandleAggregator:

# ... инициализация ...
def add_tick(self, pair: str, timeframe: str, tick: Dict) -> Optional[Dict]:
     tick_time = datetime.fromtimestamp(tick.get('timestamp', 0))

     # Получаем время начала свечи для этого тика
     candle_start = self._get_candle_start_time(tick_time, timeframe)

     # Ключ для хранения тиков текущей свечи
     key = f"{pair}_{timeframe}"

     # Если началась новая свеча, закрываем старую и создаем новую
     if key not in self.current_candle_start or candle_start > self.current_candle_start[key]:

         # Формируем свечу из накопленных тиков
         finished_candle = self._create_candle_from_ticks(key, pair, timeframe)

         # Сбрасываем буфер для новой свечи
         self.current_candle_start[key] = candle_start
         self.ticks_storage[key] = deque()
         self.ticks_storage[key].append(tick)
 
         if finished_candle:
             return finished_candle # Возвращаем ЗАКОНЧИВШУЮСЯ свечу
         else:
             return None

     # Добавляем тик в текущую свечу
     self.ticks_storage[key].append(tick)
     return None # Свеча еще не закрыта
def _get_candle_start_time(self, tick_time: datetime, timeframe: str) -> datetime:
     seconds = self.timeframe_seconds.get(timeframe, 60)
     if seconds < 60:

         # Для секундных таймфреймов
         seconds_floor = (tick_time.second // seconds) * seconds
         return tick_time.replace(second=seconds_floor, microsecond=0)
     else:

         # Для минутных и часовых
         # ... логика округления минут ...
         pass

 

Как это работает в связке:

  1. В WebSocket-клиенте на каждый входящий тик вызывается _on_tik_received.
  2. Этот метод передает тик в candle_aggregator.add_tick().
  3. Если агрегатор возвращает свечу (значит, старый интервал закончился), мы отправляем ее в обработку.

Шаг 3. Сердце визуализации — класс CandleChartMaker

Когда агрегатор возвращает свечу, она попадает в оба кеша (для анализа и графиков). А когда детектор паттернов находит сигнал, нам нужно создать график. Вся логика по работе с matplotlib и mplfinance вынесена в отдельный класс CandleChartMaker.

Почему mplfinance? Это надстройка над matplotlib, специально заточенная под финансовые графики. Она из коробки умеет рисовать японские свечи, настраивать цвета для бычьих/медвежьих свечей, добавлять объемы и делать это красиво.

Вот упрощенный, но полностью рабочий код этого класса, который лежит в основе статьи:


python
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
 
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
 
logger = logging.getLogger(__name__)
 
class CandleChartMaker:
"""Генератор красивых свечных графиков для Телеграма."""
 
def __init__(self, plot_dir: str = "logs/tmp/plots"):
     self.plot_dir = Path(plot_dir)
     self.plot_dir.mkdir(parents=True, exist_ok=True)
 
def create_chart_from_cache(self, pair: str, timeframe: str, candles: List[Dict],
                             signal_type: str = None, signal_price: float = None) -> Optional[str]:
     """
     Основной метод для создания графика.
     """
     if not candles or len(candles) < 5:
         logger.warning(f"Недостаточно данных для графика {pair}")
         return None

     # 1. Превращаем список свечей в DataFrame для mplfinance
     df = self._prepare_dataframe(candles)
     if len(df) < 2:
         return None

     # 2. Генерируем уникальное имя файла
     timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
     safe_pair = pair.replace('/', '_').replace(' ', '_')
     filename = f"{safe_pair}_{timeframe}_{signal_type}_{timestamp}.png"
     plot_path = self.plot_dir / filename

     # 3. Пытаемся нарисовать график через mplfinance
     success = self._create_mplfinance_chart(df, pair, timeframe, plot_path, signal_type, signal_price)
 
     if success:
         logger.info(f" График готов: {plot_path}")
         return str(plot_path)
     else:
         logger.error(f" Не удалось создать график для {pair}")
         return None
 
def _prepare_dataframe(self, candles: List[Dict]) -> pd.DataFrame:
     """Конвертирует список свечей в DataFrame, готовый для mplfinance."""
     data = []
     for c in candles:

         # Убеждаемся, что все ключи есть, и конвертируем время
         if not all(k in c for k in ['datetime', 'open', 'high', 'low', 'close']):
             continue
         try:
             dt = pd.to_datetime(c['datetime'])
             data.append({
                 'datetime': dt,
                 'open': float(c['open']),
                 'high': float(c['high']),
                 'low': float(c['low']),
                 'close': float(c['close']),
                 'volume': float(c.get('volume', 0)),
             })
         except (ValueError, TypeError) as e:
             logger.debug(f"Ошибка парсинга свечи: {e}")
             continue
 
     if not data:
         return pd.DataFrame()
 
     df = pd.DataFrame(data)
     df = df.sort_values('datetime')

     # mplfinance требует, чтобы индексом был datetime
     df.set_index('datetime', inplace=True)
     return df
 
def _create_mplfinance_chart(self, df: pd.DataFrame, pair: str, timeframe: str,
                               output_path: Path, signal_type: str = None,
                               signal_price: float = None) -> bool:
     """Внутренний метод для рисования с помощью mplfinance."""
     try:

         # Настройка цветовой схемы
         mc = mpf.make_marketcolors(
             up='#26a69a', # Зеленый для бычьих
             down='#ef5350',  # Красный для медвежьих
             wick='inherit',
             volume='in',
             edge='inherit'
         )
         s = mpf.make_mpf_style(marketcolors=mc, gridstyle='--', y_on_right=False)

         # Будем добавлять дополнительные линии на график
         addplots = []

         # 1. Линия цены входа (сигнала)
         if signal_price is not None:
             signal_line = [signal_price] * len(df)
             color = 'green' if signal_type and 'BUY' in signal_type.upper() else 'red'
             label = f"Вход: {signal_price:.5f}"
             ap = mpf.make_addplot(signal_line, color=color, linestyle='--', width=1, label=label)
             addplots.append(ap)

         # 2. Скользящие средние (если хватает данных)
         if len(df) >= 10:
             ma10 = df['close'].rolling(10).mean()
             ap_ma10 = mpf.make_addplot(ma10, color='orange', width=0.8, label='MA10')
             addplots.append(ap_ma10)
         if len(df) >= 20:
             ma20 = df['close'].rolling(20).mean()
             ap_ma20 = mpf.make_addplot(ma20, color='blue', width=0.8, label='MA20')
             addplots.append(ap_ma20)

         # Формируем заголовок
         title = f"{pair} ({timeframe})"
         if signal_type:
             title += f" — {signal_type} Сигнал"

         # Создаем фигуру. Параметр `returnfig=True` дает нам доступ к объекту Figure
         fig, axes = mpf.plot(
             df,
             type='candle',
             style=s,
             title=title,
             ylabel='Цена',
             volume=True,  # Показываем объем
             addplot=addplots,
             figsize=(12, 7),
             panel_ratios=(3, 1),  # Соотношение графика свечей и объема
             returnfig=True,
             tight_layout=True
         )

         # Добавляем немного статистики в правый верхний угол
         self._add_stats_text(fig, df, pair)

         # Сохраняем
         fig.savefig(output_path, dpi=120, bbox_inches='tight')
         plt.close(fig)
         return True
 
     except Exception as e:
         logger.error(f"Ошибка mplfinance: {e}", exc_info=True)
         return False
 
def _add_stats_text(self, fig, df: pd.DataFrame, pair: str):
     """Добавляет текстовую статистику на график."""
     last_candle = df.iloc[-1]
     price_change = last_candle['close'] - df.iloc[0]['open']
     price_change_pct = (price_change / df.iloc[0]['open']) * 100

     # Считаем бычьи/медвежьи свечи на графике
     bullish = (df['close'] >= df['open']).sum()
     bearish = len(df) - bullish
 
     stats_text = (
         f" {pair}n"
         f"Close: {last_candle['close']:.5f}n"
         f"Change: {price_change:+.5f} ({price_change_pct:+.2f}%)n"
         f"Bull/Bear: {bullish}/{bearish}"
     )

     # Размещаем текст в координатах фигуры
     fig.text(0.83, 0.85, stats_text, fontsize=8,
              bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8))
 
def cleanup_old_plots(self, max_age_hours: int = 1):

     """Удаляет старые графики, чтобы не забивать диск."""
     # ... логика удаления файлов ...
     pass

 

Шаг 4. Интеграция с Телеграмом

Последний шаг — самый простой и приятный. Когда сигнал обнаружен и график создан, вызываем метод нашего TelegramBotRunner (который внутри использует aiogram), передаем ему путь к картинке и данные сигнала.


python
async def _send_signal_to_subscribers(self, signal: Dict, candles_data: List[Dict] = None):
if not self.telegram_manager:
     return
try:

     # 1. Формируем данные для сообщения
     signal_data = {
         'pair': signal['pair'],
         'direction': 'buy' if signal['direction'] == 'bullish' else 'sell',
         'confidence': signal['confidence'],
         'timeframe': signal['timeframe'],
         'price': signal['current_price'],
         'pattern': signal['pattern'],
         'strategy': signal.get('strategy', 'unknown'),
         'expiry_minutes': signal.get('expiry_minutes', 2) # Время экспирации
     }

     # 2. СОЗДАЕМ ГРАФИК, используя данные из chart_candles!
     chart_path = None

     # Берем последние 50 свечей из кеша для графиков
     chart_candles = self.get_candles_for_chart(signal['pair'], signal['timeframe'], 50)
     if chart_candles:
         try:
             signal_type = 'BUY' if signal['direction'] == 'bullish' else 'SELL'
             chart_path = self.chart_maker.create_chart_from_cache(
                 pair=signal['pair'],
                 timeframe=signal['timeframe'],
                 candles=chart_candles,
                 signal_type=signal_type,
                 signal_price=float(signal['current_price'])
             )
         except Exception as e:
             logger.error(f"Ошибка генерации графика: {e}")

     # 3. Отправляем в Телеграм
     await self.telegram_manager.send_signal(signal_data, chart_path)

     # 4. Удаляем временный файл графика (менеджер сделает это сам)
     # ...
except Exception as e:
     logger.error(f"Ошибка отправки сигнала: {e}")

 

Результат: что видит пользователь

Вот что в итоге получает пользователь в Телеграме. Вместо скучной строчки — полноценный анализ.

🔴 ТОРГОВЫЙ СИГНАЛ 🔴

Пара: EUR/RUB OTC

Стратегия: Engulfing

Направление: SELL

Таймфрейм: 15s

Экспирация: 2 мин.

Цена: 85.14383

Паттерн: engulfing

Уверенность: 🟢 HIGH

Именно это и нужно трейдеру для принятия решения.

Подводные камни и их решение

В процессе разработки я столкнулся с несколькими проблемами, о которых стоит упомянуть:

  1. Дубликаты данных. При переподключении к WebSocket или при загрузке истории могли приходить уже обработанные тики. Это приводило к искажению свечей. Решение — дедупликатор, который хранит ключи (пара + таймфрейм + время + цена) и отсеивает повторы.
  2. Сбой mplfinance. Библиотека отличная, но, как и любой код, иногда падает с неочевидными ошибками. На этот случай у меня был план Б — резервный метод _create_quick_chart, который рисовал примитивный график через чистый matplotlib. Система должна быть отказоустойчивой.
  3. Утечка памяти. Если генерировать по 1000 графиков в день и не удалять их, диск быстро забьется. Метод cleanup_old_plots решает эту проблему, оставляя графики жить не более часа.

Заключение

В результате получилась самодостаточная, надежная система визуализации — не просто придаток, а полноценная часть торгового робота. Результаты, на мой взгляд, хорошие, вот какие метрики я получил на реальных данных:

  • Время создания графика: 100–300 мс.
  • Размер файла PNG: 50–100 КБ.
  • Память на 1000 графиков: ~100 МБ.
  • Поддерживаемые пары: 50+ одновременно.

Комментарии0
Тоже интересно
Комментировать
Поделиться
Скопировать ссылку
Telegram
WhatsApp
Vkontakte
Одноклассники