Kommo + Tableau: BI-дашборд продаж с актуальными данными из CRM

Tableau - корпоративный стандарт для BI-аналитики в компаниях от 50+ сотрудников. Kommo - CRM с данными воронки продаж. Без интеграции аналитика продаж строится на CSV-экспортах: вручную, раз в неделю, с устаревшими данными. Для компаний, где revenue-решения принимаются на основе дашбордов, задержка данных - это задержка решений.

Tableau не умеет напрямую запрашивать Kommo REST API - Tableau работает с базами данных, файлами или специализированными коннекторами. Поэтому архитектура интеграции всегда включает промежуточный слой: данные из Kommo сначала попадают в реляционную базу (PostgreSQL), а Tableau подключается к ней как к live data source. Этот слой также даёт возможность хранить историческую аналитику - то, чего нет при прямом подключении к Kommo.

В этой статье разберём ETL-пайплайн Kommo -> PostgreSQL -> Tableau с инкрементальной синхронизацией и автоматическим обновлением дашборда через Tableau REST API.

Почему нельзя подключиться к Kommo напрямую из Tableau

Tableau поддерживает три класса источников данных: нативные коннекторы (SQL базы, файлы), JDBC/ODBC-коннекторы и Tableau Web Data Connector (WDC). Kommo REST API не подходит ни под один из них без дополнительного слоя.

Ключеной проблема также в пагинации: Kommo API возвращает максимум 250 сделок за запрос с курсорной пагинацией. Tableau не умеет обрабатывать пагинированные API-ответы - это задача ETL-слоя.

PostgreSQL как промежуточный слой даёт три преимущества: полная история изменений сделок (Kommo хранит только текущее состояние), возможность JOIN с другими источниками (финансы, маркетинг), и скорость запросов для Tableau без ограничений API rate limit.

Архитектура

Kommo REST API -> ETL Python (cron 1h) -> PostgreSQL -> Tableau (live connection)
                                                     -> Tableau REST API (refresh trigger)

ETL запускается по расписанию, делает инкрементальную синхронизацию новых и обновлённых сделок, upsert в PostgreSQL. Tableau подключён к PostgreSQL напрямую - данные всегда не старше 1 часа.

Реализация ETL

Шаг 1 - создание схемы PostgreSQL:

CREATE TABLE IF NOT EXISTS kommo_leads (
    lead_id          BIGINT PRIMARY KEY,
    name             TEXT,
    price            NUMERIC(12, 2),
    status_id        INTEGER,
    status_name      TEXT,
    pipeline_id      INTEGER,
    pipeline_name    TEXT,
    responsible_id   INTEGER,
    responsible_name TEXT,
    created_at       TIMESTAMP,
    updated_at       TIMESTAMP,
    closed_at        TIMESTAMP,
    -- Кастомные поля (примеры)
    cf_industry      TEXT,
    cf_company_size  TEXT,
    cf_source        TEXT,
    -- Мета
    synced_at        TIMESTAMP DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_leads_updated ON kommo_leads(updated_at);
CREATE INDEX IF NOT EXISTS idx_leads_status  ON kommo_leads(status_id);
CREATE INDEX IF NOT EXISTS idx_leads_created ON kommo_leads(created_at);

Шаг 2 - инкрементальная синхронизация из Kommo:

import requests, psycopg2, os
from datetime import datetime, timezone

KOMMO_DOMAIN = "yourdomain.kommo.com"
KOMMO_TOKEN  = "your_kommo_token"
KOMMO_BASE   = f"https://{KOMMO_DOMAIN}/api/v4"

DSN = "postgresql://user:pass@localhost:5432/analytics"

# Маппинг ID статусов на понятные названия
STATUS_NAMES = {
    142: "Успешно реализовано",
    143: "Закрыто и не реализовано",
}

def fetch_leads_since(since_ts: int) -> list:
    """Fetch all leads updated since timestamp via paginated API."""
    hs = requests.Session()
    hs.headers["Authorization"] = f"Bearer {KOMMO_TOKEN}"

    leads, page = [], 1
    while True:
        r = hs.get(f"{KOMMO_BASE}/leads", params={
            "updated_at[from]": since_ts,
            "with":             "contacts,loss_reason",
            "limit":            250,
            "page":             page,
        })
        if r.status_code == 204:
            break
        batch = r.json().get("_embedded", {}).get("leads", [])
        if not batch:
            break
        leads.extend(batch)
        if len(batch) < 250:
            break
        page += 1
    return leads

def get_custom_field(lead: dict, field_code: str) -> str:
    for f in lead.get("custom_fields_values") or []:
        if f.get("field_code") == field_code:
            vals = f.get("values", [])
            return str(vals[0]["value"]) if vals else ""
    return ""

def upsert_leads(leads: list, conn):
    sql = """
        INSERT INTO kommo_leads (
            lead_id, name, price, status_id, status_name,
            pipeline_id, responsible_id,
            created_at, updated_at, closed_at,
            cf_industry, cf_company_size, cf_source, synced_at
        ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())
        ON CONFLICT (lead_id) DO UPDATE SET
            name             = EXCLUDED.name,
            price            = EXCLUDED.price,
            status_id        = EXCLUDED.status_id,
            status_name      = EXCLUDED.status_name,
            updated_at       = EXCLUDED.updated_at,
            closed_at        = EXCLUDED.closed_at,
            cf_industry      = EXCLUDED.cf_industry,
            cf_company_size  = EXCLUDED.cf_company_size,
            synced_at        = NOW()
    """
    with conn.cursor() as cur:
        for lead in leads:
            closed = lead.get("closed_at")
            cur.execute(sql, (
                lead["id"],
                lead.get("name", ""),
                lead.get("price", 0),
                lead.get("status_id"),
                STATUS_NAMES.get(lead.get("status_id"), f"stage_{lead.get('status_id')}"),
                lead.get("pipeline_id"),
                lead.get("responsible_user_id"),
                datetime.fromtimestamp(lead["created_at"], tz=timezone.utc),
                datetime.fromtimestamp(lead["updated_at"], tz=timezone.utc),
                datetime.fromtimestamp(closed, tz=timezone.utc) if closed else None,
                get_custom_field(lead, "INDUSTRY"),
                get_custom_field(lead, "COMPANY_SIZE"),
                get_custom_field(lead, "LEAD_SOURCE"),
            ))
    conn.commit()

def run_sync():
    conn = psycopg2.connect(DSN)

    # Берём timestamp последней синхронизации
    with conn.cursor() as cur:
        cur.execute("SELECT COALESCE(EXTRACT(EPOCH FROM MAX(updated_at))::BIGINT, 0) FROM kommo_leads")
        since_ts = cur.fetchone()[0]

    leads = fetch_leads_since(since_ts)
    if leads:
        upsert_leads(leads, conn)
        print(f"Synced {len(leads)} leads")

    conn.close()

if __name__ == "__main__":
    run_sync()

Шаг 3 - автоматический refresh Tableau дашборда через REST API:

Tableau REST API позволяет программно запускать Extract Refresh для datasources:

import tableauserverclient as tsc

TABLEAU_SERVER   = "https://your-tableau-server.com"
TABLEAU_SITE     = ""           # пустая строка для Default site
TABLEAU_TOKEN_NAME  = "your_token_name"
TABLEAU_TOKEN_VALUE = "your_personal_access_token"

def trigger_tableau_refresh(datasource_name: str = "Kommo Leads"):
    """Trigger Tableau extract refresh after ETL sync."""
    tableau_auth = tsc.PersonalAccessTokenAuth(
        TABLEAU_TOKEN_NAME, TABLEAU_TOKEN_VALUE, TABLEAU_SITE
    )
    server = tsc.Server(TABLEAU_SERVER, use_server_version=True)

    with server.auth.sign_in(tableau_auth):
        # Найти datasource по имени
        req_opts = tsc.RequestOptions()
        req_opts.filter.add(tsc.Filter(
            tsc.RequestOptions.Field.Name,
            tsc.RequestOptions.Operator.Equals,
            datasource_name
        ))
        all_datasources, _ = server.datasources.get(req_opts)

        if all_datasources:
            ds = all_datasources[0]
            server.datasources.refresh(ds)
            print(f"Refresh triggered for: {ds.name}")

Запускаем run_sync() -> trigger_tableau_refresh() по cron каждый час. Tableau автоматически обновляет вью после завершения refresh.

Ключевые метрики для дашборда Tableau

С данными в PostgreSQL типичный sales-дашборд включает:

  • Воронка продаж - количество и объём сделок по этапам (bar chart)
  • Conversion Rate - соотношение won/total по менеджерам и источникам
  • Deal Velocity - среднее время сделки от создания до закрытия
  • Revenue Forecast - открытые сделки с вероятностью по pipeline stage
  • Leaderboard - топ менеджеров по выручке за период
  • Source Attribution - откуда приходят выигранные сделки

Все эти метрики строятся SQL-запросами к PostgreSQL через Tableau calculated fields.

Реальный кейс

Компания с командой 12 менеджеров, 150-200 активных сделок. Еженедельные CSV-экспорты из Kommo в Excel занимали 2-3 часа. Data-решения принимались на данных недельной давности.

После внедрения PostgreSQL + Tableau:

  • Данные обновляются ежечасно без участия аналитика
  • Revenue forecast строится на актуальных pipeline-данных
  • Время на подготовку отчётов сократилось с 2-3 часов до 0
  • Tableau дашборд открыт на TV-экране в офисе продаж в режиме реального времени

Время разработки ETL: 2 дня. Tableau дашборд: 1-2 дня.

Для кого актуально

Компании с выстроенной BI-практикой на Tableau и активной воронкой в Kommo. Если ваш Tableau Server уже используется для финансовой аналитики, добавление Kommo как источника данных - логичный шаг. Для небольших команд без Tableau-лицензий проще рассмотреть Kommo + Looker Studio (бесплатный инструмент Google).

Если вы используете кастомные интеграции в Kommo CRM для нескольких систем - PostgreSQL как центральный data warehouse позволяет объединить данные из всех источников в одном месте.

Часто задаваемые вопросы

Подходит ли MySQL вместо PostgreSQL?

Да. Tableau поддерживает нативный коннектор для MySQL. Синтаксис ON CONFLICT в нашем ETL нужно заменить на INSERT ... ON DUPLICATE KEY UPDATE для MySQL. В остальном логика идентична.

Tableau Cloud (Tableau Online) поддерживает PostgreSQL?

Да, если ваш PostgreSQL доступен из интернета. Для локального PostgreSQL нужен Tableau Bridge - агент, который устанавливается в вашей сети и соединяет Tableau Cloud с локальной базой. Tableau Bridge входит в стандартную лицензию Tableau Cloud.

Как часто запускать ETL без превышения Kommo rate limit?

Kommo API: 7 запросов/секунду, 1000 запросов/5 минут на аккаунт. При 250 сделках на запрос и 10 000 активных сделках инкрементальная синхронизация занимает 40-50 запросов. Это безопасно даже при запуске каждые 15 минут.

Как хранить историю изменений для анализа стадий воронки?

Добавьте таблицу kommo_lead_history: при каждом sync записывайте строку с (lead_id, status_id, updated_at) если статус изменился. Это позволяет строить Funnel Analysis в Tableau - сколько сделок прошло через каждый этап за период.

Итог

Kommo + Tableau интеграция через PostgreSQL - это надёжная архитектура для enterprise BI. Схема:

  • Cron (каждый час): Python ETL -> Kommo API -> upsert в PostgreSQL
  • Tableau подключён к PostgreSQL как live data source
  • После ETL: Tableau REST API datasources.refresh() обновляет дашборд
  • Инкрементальная синхронизация по updated_at - нагрузка минимальна

Если вы внедряете Tableau для аналитики продаж и хотите видеть данные Kommo в режиме реального времени - опишите задачу команде Exceltic.dev. Спроектируем схему данных под ваши метрики.

Ещё статьи

Все →