Почему нативная интеграция не работает
Plecto - датская компания, специализирующаяся на real-time performance dashboards для команд продаж. Её главные особенности: TV-дашборды для офиса (большой экран с метриками), gamification (лидерборды, achievement badges), instant notifications при достижении KPI. Plecto популярна в Nordics и Western Europe среди B2B sales-команд.
Готовой нативной интеграции Plecto + Kommo нет. Plecto имеет прямые коннекторы к Salesforce, HubSpot, Pipedrive. Для Kommo нужен кастомный подход.
Plecto поддерживает Custom Datasource - механизм, при котором Plecto периодически (каждые несколько минут) опрашивает ваш REST API endpoint и получает данные в заданном формате. Это открывает возможность: создать Python-сервис, который агрегирует данные из Kommo API и отдаёт их Plecto в нужной структуре.
Для компаний, которые строят кастомные интеграции для Kommo, такой подход - стандартный паттерн ETL с кэшированием.
Что реализуется - архитектура решения
Plecto (каждые 5 минут)
--> GET /api/plecto/kommo-kpi
--> Python сервис (с кэшом)
--> Kommo API: GET /leads (текущий день)
--> Kommo API: GET /leads (воронка по этапам)
--> Kommo API: GET /leads (по менеджерам)
<-- JSON в формате Plecto Custom Widget
Технические детали
Plecto Custom Datasource. Настраивается в Plecto -> Data Sources -> Add -> Custom (REST). Plecto делает GET-запрос к вашему endpoint. Ожидаемый формат ответа - JSON array объектов. Каждый объект - одна строка данных с полями date, metric_name, value, employee_id (опционально).
Formат Plecto Custom Widget JSON:
[
{
"date": "2026-06-01T10:00:00Z",
"employee": "John Smith",
"metric": "deals_won",
"value": 3
}
]
Kommo API для агрегации. Используем несколько эндпоинтов:
GET /api/v4/leadsс фильтромfilter[closed_at][from]={today_unix}- сделки закрытые сегодняGET /api/v4/leadsсfilter[statuses][][pipeline_id]={id}&filter[statuses][][status_id]={id}- сделки по этапамGET /api/v4/users- список менеджеров
Кэширование. Kommo API имеет rate limit 7 запросов/секунду. При частых запросах от Plecto нужно кэшировать данные в памяти или Redis с TTL 2-5 минут.
Пошаговая реализация
Шаг 1. FastAPI сервис с кэшированием
import os
import time
import requests
from datetime import datetime, date
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
app = FastAPI()
security = HTTPBasic()
KOMMO_DOMAIN = os.environ["KOMMO_DOMAIN"]
KOMMO_TOKEN = os.environ["KOMMO_ACCESS_TOKEN"]
PLECTO_AUTH_USER = os.environ.get("PLECTO_AUTH_USER", "plecto")
PLECTO_AUTH_PASS = os.environ.get("PLECTO_AUTH_PASS", "secret")
# Простой in-memory кэш
_cache: dict = {"data": None, "timestamp": 0, "ttl": 300} # 5 минут
def authenticate(credentials: HTTPBasicCredentials = Depends(security)):
"""Базовая аутентификация для Plecto endpoint."""
correct_user = secrets.compare_digest(credentials.username, PLECTO_AUTH_USER)
correct_pass = secrets.compare_digest(credentials.password, PLECTO_AUTH_PASS)
if not (correct_user and correct_pass):
raise HTTPException(status_code=401, detail="Unauthorized")
def kommo_get(path: str, params: dict = None) -> dict:
"""Запрос к Kommo API."""
url = f"https://{KOMMO_DOMAIN}/api/v4{path}"
headers = {"Authorization": f"Bearer {KOMMO_TOKEN}"}
r = requests.get(url, params=params or {}, headers=headers, timeout=15)
if not r.ok:
return {}
return r.json()
def get_today_timestamp() -> int:
"""Unix timestamp начала сегодняшнего дня (UTC)."""
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
return int(today.timestamp())
def fetch_kommo_kpi() -> list[dict]:
"""Агрегируем KPI из Kommo API."""
today_ts = get_today_timestamp()
today_str = date.today().isoformat()
result = []
# 1. Deals Won сегодня (по менеджерам)
won_params = {
"filter[closed_at][from]": today_ts,
"filter[statuses][][status_id]": 142, # Won
"limit": 250,
"with": "loss_reason",
}
won_data = kommo_get("/leads", won_params)
won_leads = won_data.get("_embedded", {}).get("leads", [])
# Группируем по responsible_user_id
won_by_user: dict[int, int] = {}
won_value_by_user: dict[int, int] = {}
for lead in won_leads:
uid = lead.get("responsible_user_id", 0)
won_by_user[uid] = won_by_user.get(uid, 0) + 1
won_value_by_user[uid] = won_value_by_user.get(uid, 0) + lead.get("price", 0)
# Получаем имена пользователей
users_data = kommo_get("/users")
users = {u["id"]: u["name"] for u in users_data.get("_embedded", {}).get("users", [])}
for uid, count in won_by_user.items():
employee = users.get(uid, f"User {uid}")
result.append({
"date": f"{today_str}T00:00:00Z",
"employee": employee,
"metric": "deals_won_today",
"value": count,
})
result.append({
"date": f"{today_str}T00:00:00Z",
"employee": employee,
"metric": "won_value_today",
"value": won_value_by_user.get(uid, 0),
})
# 2. Pipeline value по этапам (текущий момент)
pipeline_data = kommo_get("/leads", {
"filter[statuses][][status_id]": "!142,!143", # исключаем Won и Lost
"limit": 250,
})
pipeline_leads = pipeline_data.get("_embedded", {}).get("leads", [])
# Получаем статусы
statuses_data = kommo_get("/pipelines", {"with": "statuses"})
status_names: dict[int, str] = {}
for pipeline in statuses_data.get("_embedded", {}).get("pipelines", []):
for status in pipeline.get("_embedded", {}).get("statuses", []):
status_names[status["id"]] = status["name"]
stage_values: dict[int, int] = {}
stage_counts: dict[int, int] = {}
for lead in pipeline_leads:
sid = lead.get("status_id", 0)
stage_values[sid] = stage_values.get(sid, 0) + lead.get("price", 0)
stage_counts[sid] = stage_counts.get(sid, 0) + 1
for sid, value in stage_values.items():
stage_name = status_names.get(sid, f"Stage {sid}")
result.append({
"date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"employee": "Team",
"metric": f"pipeline_{stage_name.lower().replace(' ', '_')}_value",
"value": value,
})
result.append({
"date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"employee": "Team",
"metric": f"pipeline_{stage_name.lower().replace(' ', '_')}_count",
"value": stage_counts.get(sid, 0),
})
# 3. Total pipeline value
total_pipeline = sum(stage_values.values())
result.append({
"date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"employee": "Team",
"metric": "total_pipeline_value",
"value": total_pipeline,
})
return result
@app.get("/api/plecto/kommo-kpi")
def get_kommo_kpi(credentials: HTTPBasicCredentials = Depends(authenticate)):
"""Endpoint для Plecto Custom Datasource."""
now = time.time()
if _cache["data"] is None or (now - _cache["timestamp"]) > _cache["ttl"]:
_cache["data"] = fetch_kommo_kpi()
_cache["timestamp"] = now
return _cache["data"]
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Шаг 2. Настройка Plecto Custom Datasource
- В Plecto: Data Sources -> Add -> Custom (REST)
- URL:
https://your-service.example.com/api/plecto/kommo-kpi - Authentication: Basic Auth (логин/пароль из
PLECTO_AUTH_USER/PASS) - Refresh interval: 5 минут
- Date field:
date - Numeric fields:
value - Text fields:
employee,metric
Шаг 3. Создание виджетов в Plecto
После подключения datasource создайте виджеты:
- Number widget с формулой
SUM(value) WHERE metric = 'deals_won_today'- сделок закрыто сегодня - Leaderboard с группировкой по
employee, метрикаdeals_won_today- рейтинг менеджеров - Bar chart с метрикой
pipeline_*_count- количество сделок по этапам - KPI Target с целью на день и текущим значением
won_value_today
Реальный кейс с цифрами
Для EU B2B компании с офисом в Амстердаме (8 SDR, 1 sales manager) Plecto TV-дашборд с данными из Kommo создаёт эффект прозрачности и соревнования.
До интеграции: менеджер смотрел отчёты в Kommo в конце дня. SDR не видели прогресс друг друга в реальном времени. Командный результат был неизвестен до финального стендапа.
После интеграции: на TV-экране в офисе обновляется лидерборд каждые 5 минут. При закрытии сделки Plecto воспроизводит звуковое уведомление и анимацию. По отзывам схожих команд, gamification создаёт видимый эффект на активность SDR в последние 2-3 часа рабочего дня - именно тогда, когда обычно падает энергия.
Для кого подходит
Интеграция актуальна для компаний, которые:
- Имеют физический офис, где можно разместить TV с дашбордом
- Используют Kommo как основную CRM sales-команды
- Хотят внедрить gamification и прозрачность KPI без внедрения нового CRM
- Работают в Западной Европе, где Plecto известен как EU-native инструмент (GDPR-compliant, серверы в EU)
Если вам нужна более глубокая аналитика - Kommo + Metabase или Kommo + Power BI дадут больше возможностей для ad-hoc анализа.
Часто задаваемые вопросы
Plecto поддерживает исторические данные или только real-time?
Plecto поддерживает оба режима. Если ваш endpoint возвращает данные с полем date за прошлые периоды, Plecto строит графики за неделю/месяц. Для этого расширьте fetch_kommo_kpi() на получение данных за N дней.
Как добавить метрику «количество звонков»?
Если вы интегрировали OpenPhone или Aircall с Kommo через заметки - можно подсчитывать notes типа call_in и call_out за день через GET /api/v4/leads/{id}/notes. Для команды это более трудоёмко, но возможно.
Какой rate limit у Kommo API и не будет ли проблем с частыми запросами? Kommo API: 7 запросов/секунду. С кэшированием TTL 5 минут ваш endpoint делает 3-5 запросов к Kommo при каждом обновлении - это хорошо в пределах лимита. Без кэша при частых запросах от Plecto получите ошибку 429.
Можно ли показывать данные нескольких воронок одновременно?
Да. Получите список воронок через GET /api/v4/pipelines, итерируйтесь по каждой и добавляйте поле pipeline_name в JSON для Plecto. Затем в Plecto фильтруйте виджет по конкретной воронке.
Если вам нужна интеграция Kommo с Plecto - опишите ваш стек и команду в Exceltic.dev. Разберём архитектуру и оценим объём работ.