Kommo + Plecto: KPI-дашборд команды продаж в реальном времени

Почему нативная интеграция не работает

Plecto - датская компания, специализирующаяся на real-time performance dashboards для команд продаж. Её главные особенности: TV-дашборды для офиса (большой экран с метриками), gamification (лидерборды, achievement badges), instant notifications при достижении KPI. Plecto популярна в Nordics и Western Europe среди B2B sales-команд.

Готовой нативной интеграции Plecto + Kommo нет. Plecto имеет прямые коннекторы к Salesforce, HubSpot, Pipedrive. Для Kommo нужен кастомный подход.

Plecto поддерживает Custom Datasource - механизм, при котором Plecto периодически (каждые несколько минут) опрашивает ваш REST API endpoint и получает данные в заданном формате. Это открывает возможность: создать Python-сервис, который агрегирует данные из Kommo API и отдаёт их Plecto в нужной структуре.

Для компаний, которые строят кастомные интеграции для Kommo, такой подход - стандартный паттерн ETL с кэшированием.

Что реализуется - архитектура решения

Plecto (каждые 5 минут)
    --> GET /api/plecto/kommo-kpi
        --> Python сервис (с кэшом)
            --> Kommo API: GET /leads (текущий день)
            --> Kommo API: GET /leads (воронка по этапам)
            --> Kommo API: GET /leads (по менеджерам)
        <-- JSON в формате Plecto Custom Widget

Технические детали

Plecto Custom Datasource. Настраивается в Plecto -> Data Sources -> Add -> Custom (REST). Plecto делает GET-запрос к вашему endpoint. Ожидаемый формат ответа - JSON array объектов. Каждый объект - одна строка данных с полями date, metric_name, value, employee_id (опционально).

Formат Plecto Custom Widget JSON:

[
  {
    "date": "2026-06-01T10:00:00Z",
    "employee": "John Smith",
    "metric": "deals_won",
    "value": 3
  }
]

Kommo API для агрегации. Используем несколько эндпоинтов:

  • GET /api/v4/leads с фильтром filter[closed_at][from]={today_unix} - сделки закрытые сегодня
  • GET /api/v4/leads с filter[statuses][][pipeline_id]={id}&filter[statuses][][status_id]={id} - сделки по этапам
  • GET /api/v4/users - список менеджеров

Кэширование. Kommo API имеет rate limit 7 запросов/секунду. При частых запросах от Plecto нужно кэшировать данные в памяти или Redis с TTL 2-5 минут.

Пошаговая реализация

Шаг 1. FastAPI сервис с кэшированием

import os
import time
import requests
from datetime import datetime, date
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets

app = FastAPI()
security = HTTPBasic()

KOMMO_DOMAIN = os.environ["KOMMO_DOMAIN"]
KOMMO_TOKEN = os.environ["KOMMO_ACCESS_TOKEN"]
PLECTO_AUTH_USER = os.environ.get("PLECTO_AUTH_USER", "plecto")
PLECTO_AUTH_PASS = os.environ.get("PLECTO_AUTH_PASS", "secret")

# Простой in-memory кэш
_cache: dict = {"data": None, "timestamp": 0, "ttl": 300}  # 5 минут


def authenticate(credentials: HTTPBasicCredentials = Depends(security)):
    """Базовая аутентификация для Plecto endpoint."""
    correct_user = secrets.compare_digest(credentials.username, PLECTO_AUTH_USER)
    correct_pass = secrets.compare_digest(credentials.password, PLECTO_AUTH_PASS)
    if not (correct_user and correct_pass):
        raise HTTPException(status_code=401, detail="Unauthorized")


def kommo_get(path: str, params: dict = None) -> dict:
    """Запрос к Kommo API."""
    url = f"https://{KOMMO_DOMAIN}/api/v4{path}"
    headers = {"Authorization": f"Bearer {KOMMO_TOKEN}"}
    r = requests.get(url, params=params or {}, headers=headers, timeout=15)
    if not r.ok:
        return {}
    return r.json()


def get_today_timestamp() -> int:
    """Unix timestamp начала сегодняшнего дня (UTC)."""
    today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
    return int(today.timestamp())


def fetch_kommo_kpi() -> list[dict]:
    """Агрегируем KPI из Kommo API."""
    today_ts = get_today_timestamp()
    today_str = date.today().isoformat()
    result = []

    # 1. Deals Won сегодня (по менеджерам)
    won_params = {
        "filter[closed_at][from]": today_ts,
        "filter[statuses][][status_id]": 142,  # Won
        "limit": 250,
        "with": "loss_reason",
    }
    won_data = kommo_get("/leads", won_params)
    won_leads = won_data.get("_embedded", {}).get("leads", [])

    # Группируем по responsible_user_id
    won_by_user: dict[int, int] = {}
    won_value_by_user: dict[int, int] = {}
    for lead in won_leads:
        uid = lead.get("responsible_user_id", 0)
        won_by_user[uid] = won_by_user.get(uid, 0) + 1
        won_value_by_user[uid] = won_value_by_user.get(uid, 0) + lead.get("price", 0)

    # Получаем имена пользователей
    users_data = kommo_get("/users")
    users = {u["id"]: u["name"] for u in users_data.get("_embedded", {}).get("users", [])}

    for uid, count in won_by_user.items():
        employee = users.get(uid, f"User {uid}")
        result.append({
            "date": f"{today_str}T00:00:00Z",
            "employee": employee,
            "metric": "deals_won_today",
            "value": count,
        })
        result.append({
            "date": f"{today_str}T00:00:00Z",
            "employee": employee,
            "metric": "won_value_today",
            "value": won_value_by_user.get(uid, 0),
        })

    # 2. Pipeline value по этапам (текущий момент)
    pipeline_data = kommo_get("/leads", {
        "filter[statuses][][status_id]": "!142,!143",  # исключаем Won и Lost
        "limit": 250,
    })
    pipeline_leads = pipeline_data.get("_embedded", {}).get("leads", [])

    # Получаем статусы
    statuses_data = kommo_get("/pipelines", {"with": "statuses"})
    status_names: dict[int, str] = {}
    for pipeline in statuses_data.get("_embedded", {}).get("pipelines", []):
        for status in pipeline.get("_embedded", {}).get("statuses", []):
            status_names[status["id"]] = status["name"]

    stage_values: dict[int, int] = {}
    stage_counts: dict[int, int] = {}
    for lead in pipeline_leads:
        sid = lead.get("status_id", 0)
        stage_values[sid] = stage_values.get(sid, 0) + lead.get("price", 0)
        stage_counts[sid] = stage_counts.get(sid, 0) + 1

    for sid, value in stage_values.items():
        stage_name = status_names.get(sid, f"Stage {sid}")
        result.append({
            "date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
            "employee": "Team",
            "metric": f"pipeline_{stage_name.lower().replace(' ', '_')}_value",
            "value": value,
        })
        result.append({
            "date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
            "employee": "Team",
            "metric": f"pipeline_{stage_name.lower().replace(' ', '_')}_count",
            "value": stage_counts.get(sid, 0),
        })

    # 3. Total pipeline value
    total_pipeline = sum(stage_values.values())
    result.append({
        "date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
        "employee": "Team",
        "metric": "total_pipeline_value",
        "value": total_pipeline,
    })

    return result


@app.get("/api/plecto/kommo-kpi")
def get_kommo_kpi(credentials: HTTPBasicCredentials = Depends(authenticate)):
    """Endpoint для Plecto Custom Datasource."""
    now = time.time()
    if _cache["data"] is None or (now - _cache["timestamp"]) > _cache["ttl"]:
        _cache["data"] = fetch_kommo_kpi()
        _cache["timestamp"] = now
    return _cache["data"]


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Шаг 2. Настройка Plecto Custom Datasource

  1. В Plecto: Data Sources -> Add -> Custom (REST)
  2. URL: https://your-service.example.com/api/plecto/kommo-kpi
  3. Authentication: Basic Auth (логин/пароль из PLECTO_AUTH_USER/PASS)
  4. Refresh interval: 5 минут
  5. Date field: date
  6. Numeric fields: value
  7. Text fields: employee, metric

Шаг 3. Создание виджетов в Plecto

После подключения datasource создайте виджеты:

  • Number widget с формулой SUM(value) WHERE metric = 'deals_won_today' - сделок закрыто сегодня
  • Leaderboard с группировкой по employee, метрика deals_won_today - рейтинг менеджеров
  • Bar chart с метрикой pipeline_*_count - количество сделок по этапам
  • KPI Target с целью на день и текущим значением won_value_today

Реальный кейс с цифрами

Для EU B2B компании с офисом в Амстердаме (8 SDR, 1 sales manager) Plecto TV-дашборд с данными из Kommo создаёт эффект прозрачности и соревнования.

До интеграции: менеджер смотрел отчёты в Kommo в конце дня. SDR не видели прогресс друг друга в реальном времени. Командный результат был неизвестен до финального стендапа.

После интеграции: на TV-экране в офисе обновляется лидерборд каждые 5 минут. При закрытии сделки Plecto воспроизводит звуковое уведомление и анимацию. По отзывам схожих команд, gamification создаёт видимый эффект на активность SDR в последние 2-3 часа рабочего дня - именно тогда, когда обычно падает энергия.

Для кого подходит

Интеграция актуальна для компаний, которые:

  • Имеют физический офис, где можно разместить TV с дашбордом
  • Используют Kommo как основную CRM sales-команды
  • Хотят внедрить gamification и прозрачность KPI без внедрения нового CRM
  • Работают в Западной Европе, где Plecto известен как EU-native инструмент (GDPR-compliant, серверы в EU)

Если вам нужна более глубокая аналитика - Kommo + Metabase или Kommo + Power BI дадут больше возможностей для ad-hoc анализа.

Часто задаваемые вопросы

Plecto поддерживает исторические данные или только real-time? Plecto поддерживает оба режима. Если ваш endpoint возвращает данные с полем date за прошлые периоды, Plecto строит графики за неделю/месяц. Для этого расширьте fetch_kommo_kpi() на получение данных за N дней.

Как добавить метрику «количество звонков»? Если вы интегрировали OpenPhone или Aircall с Kommo через заметки - можно подсчитывать notes типа call_in и call_out за день через GET /api/v4/leads/{id}/notes. Для команды это более трудоёмко, но возможно.

Какой rate limit у Kommo API и не будет ли проблем с частыми запросами? Kommo API: 7 запросов/секунду. С кэшированием TTL 5 минут ваш endpoint делает 3-5 запросов к Kommo при каждом обновлении - это хорошо в пределах лимита. Без кэша при частых запросах от Plecto получите ошибку 429.

Можно ли показывать данные нескольких воронок одновременно? Да. Получите список воронок через GET /api/v4/pipelines, итерируйтесь по каждой и добавляйте поле pipeline_name в JSON для Plecto. Затем в Plecto фильтруйте виджет по конкретной воронке.

Если вам нужна интеграция Kommo с Plecto - опишите ваш стек и команду в Exceltic.dev. Разберём архитектуру и оценим объём работ.

Ещё статьи

Все →