Проблема и решение: как превратить текстовый спам в полезное сообщение
Исходная система была простой: WebSocket-клиент получал тики, детектор паттернов находил «поглощение» или «стохастик» и отправлял сообщение. Вот как это выглядело:
📈 СИГНАЛ: EUR/USD OTC (15s) – BUY
Полезной информации — ноль. Пользователь не видел:
- графика перед сигналом (был ли тренд?);
- формы свечи, которая дала сигнал;
- уровня входа относительно ближайших максимумов/минимумов.
Нужно было сделать так, чтобы бот:
- Автоматически рисовал график при обнаружении сигнала.
- Делал это быстро (менее 300 мс), чтобы не тормозить основной поток анализа.
- Отправлял график в Телеграм и сразу удалял его с сервера, чтобы не забивать место.
- Работал для любых таймфреймов — от 15 секунд до 1 дня.
Звучит как отдельный микросервис, но на деле всё решается грамотной архитектурой в рамках одного приложения. Поехали!
Шаг 1. Архитектурная особенность: два кеша, чтобы не смешивать анализ и визуализацию
Первая и самая важная архитектурная ошибка, которую я совершил в первой версии и хотел исправить в этой, — хранить все свечи в одном месте. Анализу нужны только свежие данные (последние 20–50 свечей), а для графика — больше истории (до 500 свечей), и она может быть старой.
Смешивать их — значит, засорять память и замедлять анализ. Решение: два отдельных кеша внутри главного класса монитора.
python
class AdvancedMonitor:
def __init__(self, config: Dict):
# ... инициализация других компонентов ...
# Кеш для АНАЛИЗА: только свежие свечи. Храним по правилам из конфига.
self.analysis_candles = {}
# Кеш для ГРАФИКОВ: максимум истории. Храним до 500 свечей.
self.chart_candles = {}
self.max_chart_candles = 500
async def _on_tik_received(self, tik: Dict):
# ... обработка тика и создание свечи (aggregated_candle) ...
# Дальше — магия разделения
key = f"{pair}_{timeframe}"
# 1. Сохраняем ВСЁ в кеш для графиков
if key not in self.chart_candles:
self.chart_candles[key] = []
self.chart_candles[key].append(aggregated_candle)
# Ограничиваем длину для экономии памяти
if len(self.chart_candles[key]) > self.max_chart_candles:
self.chart_candles[key] = self.chart_candles[key][-self.max_chart_candles:]
# 2. Сохраняем только СВЕЖЕЕ в кеш для анализа
current_time = datetime.now().timestamp()
candle_age = current_time - aggregated_candle.get('timestamp', 0)
timeframe_max_age = self.get_max_candle_age_for_timeframe(timeframe) # 45 сек. для 15s
if candle_age <= timeframe_max_age:
if key not in self.analysis_candles:
self.analysis_candles[key] = []
self.analysis_candles[key].append(aggregated_candle)
# Ограничиваем длину для анализа (из конфига)
max_store = self.get_max_candles_to_store(timeframe) # 400 для 15s
if len(self.analysis_candles[key]) > max_store:
self.analysis_candles[key] = self.analysis_candles[key][-max_store:]
Зачем так сложно?
Анализ теперь работает с небольшим, очень релевантным набором данных, что повышает скорость проверки паттернов.
Визуализация имеет доступ к более глубокой истории, чтобы нарисовать красивый график с контекстом. Они не мешают друг другу.
Шаг 2. Агрегатор в реальном времени переводит тики в свечи
На вход мы получаем от WebSocket API поток сырых тиков (цен). Нам нужно самим собирать из них свечи. Для этого я написал класс RealTimeCandleAggregator. Он получает на вход каждый тик, определяет, к какой свече по времени он относится, и обновляет ее OHLCV (Open, High, Low, Close, Volume).
Важный нюанс: таймфрейм может быть секундным (15s, 30s), поэтому нужно аккуратно округлять время.
python
class RealTimeCandleAggregator:
# ... инициализация ...
def add_tick(self, pair: str, timeframe: str, tick: Dict) -> Optional[Dict]:
tick_time = datetime.fromtimestamp(tick.get('timestamp', 0))
# Получаем время начала свечи для этого тика
candle_start = self._get_candle_start_time(tick_time, timeframe)
# Ключ для хранения тиков текущей свечи
key = f"{pair}_{timeframe}"
# Если началась новая свеча, закрываем старую и создаем новую
if key not in self.current_candle_start or candle_start > self.current_candle_start[key]:
# Формируем свечу из накопленных тиков
finished_candle = self._create_candle_from_ticks(key, pair, timeframe)
# Сбрасываем буфер для новой свечи
self.current_candle_start[key] = candle_start
self.ticks_storage[key] = deque()
self.ticks_storage[key].append(tick)
if finished_candle:
return finished_candle # Возвращаем ЗАКОНЧИВШУЮСЯ свечу
else:
return None
# Добавляем тик в текущую свечу
self.ticks_storage[key].append(tick)
return None # Свеча еще не закрыта
def _get_candle_start_time(self, tick_time: datetime, timeframe: str) -> datetime:
seconds = self.timeframe_seconds.get(timeframe, 60)
if seconds < 60:
# Для секундных таймфреймов
seconds_floor = (tick_time.second // seconds) * seconds
return tick_time.replace(second=seconds_floor, microsecond=0)
else:
# Для минутных и часовых
# ... логика округления минут ...
pass
Как это работает в связке:
- В WebSocket-клиенте на каждый входящий тик вызывается _on_tik_received.
- Этот метод передает тик в candle_aggregator.add_tick().
- Если агрегатор возвращает свечу (значит, старый интервал закончился), мы отправляем ее в обработку.
Шаг 3. Сердце визуализации — класс CandleChartMaker
Когда агрегатор возвращает свечу, она попадает в оба кеша (для анализа и графиков). А когда детектор паттернов находит сигнал, нам нужно создать график. Вся логика по работе с matplotlib и mplfinance вынесена в отдельный класс CandleChartMaker.
Почему mplfinance? Это надстройка над matplotlib, специально заточенная под финансовые графики. Она из коробки умеет рисовать японские свечи, настраивать цвета для бычьих/медвежьих свечей, добавлять объемы и делать это красиво.
Вот упрощенный, но полностью рабочий код этого класса, который лежит в основе статьи:
python
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import pandas as pd
import mplfinance as mpf
import matplotlib.pyplot as plt
logger = logging.getLogger(__name__)
class CandleChartMaker:
"""Генератор красивых свечных графиков для Телеграма."""
def __init__(self, plot_dir: str = "logs/tmp/plots"):
self.plot_dir = Path(plot_dir)
self.plot_dir.mkdir(parents=True, exist_ok=True)
def create_chart_from_cache(self, pair: str, timeframe: str, candles: List[Dict],
signal_type: str = None, signal_price: float = None) -> Optional[str]:
"""
Основной метод для создания графика.
"""
if not candles or len(candles) < 5:
logger.warning(f"Недостаточно данных для графика {pair}")
return None
# 1. Превращаем список свечей в DataFrame для mplfinance
df = self._prepare_dataframe(candles)
if len(df) < 2:
return None
# 2. Генерируем уникальное имя файла
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
safe_pair = pair.replace('/', '_').replace(' ', '_')
filename = f"{safe_pair}_{timeframe}_{signal_type}_{timestamp}.png"
plot_path = self.plot_dir / filename
# 3. Пытаемся нарисовать график через mplfinance
success = self._create_mplfinance_chart(df, pair, timeframe, plot_path, signal_type, signal_price)
if success:
logger.info(f" График готов: {plot_path}")
return str(plot_path)
else:
logger.error(f" Не удалось создать график для {pair}")
return None
def _prepare_dataframe(self, candles: List[Dict]) -> pd.DataFrame:
"""Конвертирует список свечей в DataFrame, готовый для mplfinance."""
data = []
for c in candles:
# Убеждаемся, что все ключи есть, и конвертируем время
if not all(k in c for k in ['datetime', 'open', 'high', 'low', 'close']):
continue
try:
dt = pd.to_datetime(c['datetime'])
data.append({
'datetime': dt,
'open': float(c['open']),
'high': float(c['high']),
'low': float(c['low']),
'close': float(c['close']),
'volume': float(c.get('volume', 0)),
})
except (ValueError, TypeError) as e:
logger.debug(f"Ошибка парсинга свечи: {e}")
continue
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
df = df.sort_values('datetime')
# mplfinance требует, чтобы индексом был datetime
df.set_index('datetime', inplace=True)
return df
def _create_mplfinance_chart(self, df: pd.DataFrame, pair: str, timeframe: str,
output_path: Path, signal_type: str = None,
signal_price: float = None) -> bool:
"""Внутренний метод для рисования с помощью mplfinance."""
try:
# Настройка цветовой схемы
mc = mpf.make_marketcolors(
up='#26a69a', # Зеленый для бычьих
down='#ef5350', # Красный для медвежьих
wick='inherit',
volume='in',
edge='inherit'
)
s = mpf.make_mpf_style(marketcolors=mc, gridstyle='--', y_on_right=False)
# Будем добавлять дополнительные линии на график
addplots = []
# 1. Линия цены входа (сигнала)
if signal_price is not None:
signal_line = [signal_price] * len(df)
color = 'green' if signal_type and 'BUY' in signal_type.upper() else 'red'
label = f"Вход: {signal_price:.5f}"
ap = mpf.make_addplot(signal_line, color=color, linestyle='--', width=1, label=label)
addplots.append(ap)
# 2. Скользящие средние (если хватает данных)
if len(df) >= 10:
ma10 = df['close'].rolling(10).mean()
ap_ma10 = mpf.make_addplot(ma10, color='orange', width=0.8, label='MA10')
addplots.append(ap_ma10)
if len(df) >= 20:
ma20 = df['close'].rolling(20).mean()
ap_ma20 = mpf.make_addplot(ma20, color='blue', width=0.8, label='MA20')
addplots.append(ap_ma20)
# Формируем заголовок
title = f"{pair} ({timeframe})"
if signal_type:
title += f" — {signal_type} Сигнал"
# Создаем фигуру. Параметр `returnfig=True` дает нам доступ к объекту Figure
fig, axes = mpf.plot(
df,
type='candle',
style=s,
title=title,
ylabel='Цена',
volume=True, # Показываем объем
addplot=addplots,
figsize=(12, 7),
panel_ratios=(3, 1), # Соотношение графика свечей и объема
returnfig=True,
tight_layout=True
)
# Добавляем немного статистики в правый верхний угол
self._add_stats_text(fig, df, pair)
# Сохраняем
fig.savefig(output_path, dpi=120, bbox_inches='tight')
plt.close(fig)
return True
except Exception as e:
logger.error(f"Ошибка mplfinance: {e}", exc_info=True)
return False
def _add_stats_text(self, fig, df: pd.DataFrame, pair: str):
"""Добавляет текстовую статистику на график."""
last_candle = df.iloc[-1]
price_change = last_candle['close'] - df.iloc[0]['open']
price_change_pct = (price_change / df.iloc[0]['open']) * 100
# Считаем бычьи/медвежьи свечи на графике
bullish = (df['close'] >= df['open']).sum()
bearish = len(df) - bullish
stats_text = (
f" {pair}n"
f"Close: {last_candle['close']:.5f}n"
f"Change: {price_change:+.5f} ({price_change_pct:+.2f}%)n"
f"Bull/Bear: {bullish}/{bearish}"
)
# Размещаем текст в координатах фигуры
fig.text(0.83, 0.85, stats_text, fontsize=8,
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8))
def cleanup_old_plots(self, max_age_hours: int = 1):
"""Удаляет старые графики, чтобы не забивать диск."""
# ... логика удаления файлов ...
pass
Шаг 4. Интеграция с Телеграмом
Последний шаг — самый простой и приятный. Когда сигнал обнаружен и график создан, вызываем метод нашего TelegramBotRunner (который внутри использует aiogram), передаем ему путь к картинке и данные сигнала.
python
async def _send_signal_to_subscribers(self, signal: Dict, candles_data: List[Dict] = None):
if not self.telegram_manager:
return
try:
# 1. Формируем данные для сообщения
signal_data = {
'pair': signal['pair'],
'direction': 'buy' if signal['direction'] == 'bullish' else 'sell',
'confidence': signal['confidence'],
'timeframe': signal['timeframe'],
'price': signal['current_price'],
'pattern': signal['pattern'],
'strategy': signal.get('strategy', 'unknown'),
'expiry_minutes': signal.get('expiry_minutes', 2) # Время экспирации
}
# 2. СОЗДАЕМ ГРАФИК, используя данные из chart_candles!
chart_path = None
# Берем последние 50 свечей из кеша для графиков
chart_candles = self.get_candles_for_chart(signal['pair'], signal['timeframe'], 50)
if chart_candles:
try:
signal_type = 'BUY' if signal['direction'] == 'bullish' else 'SELL'
chart_path = self.chart_maker.create_chart_from_cache(
pair=signal['pair'],
timeframe=signal['timeframe'],
candles=chart_candles,
signal_type=signal_type,
signal_price=float(signal['current_price'])
)
except Exception as e:
logger.error(f"Ошибка генерации графика: {e}")
# 3. Отправляем в Телеграм
await self.telegram_manager.send_signal(signal_data, chart_path)
# 4. Удаляем временный файл графика (менеджер сделает это сам)
# ...
except Exception as e:
logger.error(f"Ошибка отправки сигнала: {e}")
Результат: что видит пользователь
Вот что в итоге получает пользователь в Телеграме. Вместо скучной строчки — полноценный анализ.
🔴 ТОРГОВЫЙ СИГНАЛ 🔴
Пара: EUR/RUB OTC
Стратегия: Engulfing
Направление: SELL
Таймфрейм: 15s
Экспирация: 2 мин.
Цена: 85.14383
Паттерн: engulfing
Уверенность: 🟢 HIGH
Именно это и нужно трейдеру для принятия решения.
Подводные камни и их решение
В процессе разработки я столкнулся с несколькими проблемами, о которых стоит упомянуть:
- Дубликаты данных. При переподключении к WebSocket или при загрузке истории могли приходить уже обработанные тики. Это приводило к искажению свечей. Решение — дедупликатор, который хранит ключи (пара + таймфрейм + время + цена) и отсеивает повторы.
- Сбой mplfinance. Библиотека отличная, но, как и любой код, иногда падает с неочевидными ошибками. На этот случай у меня был план Б — резервный метод _create_quick_chart, который рисовал примитивный график через чистый matplotlib. Система должна быть отказоустойчивой.
- Утечка памяти. Если генерировать по 1000 графиков в день и не удалять их, диск быстро забьется. Метод cleanup_old_plots решает эту проблему, оставляя графики жить не более часа.
Заключение
В результате получилась самодостаточная, надежная система визуализации — не просто придаток, а полноценная часть торгового робота. Результаты, на мой взгляд, хорошие, вот какие метрики я получил на реальных данных:
- Время создания графика: 100–300 мс.
- Размер файла PNG: 50–100 КБ.
- Память на 1000 графиков: ~100 МБ.
- Поддерживаемые пары: 50+ одновременно.