Tableau - корпоративный стандарт для BI-аналитики в компаниях от 50+ сотрудников. Kommo - CRM с данными воронки продаж. Без интеграции аналитика продаж строится на CSV-экспортах: вручную, раз в неделю, с устаревшими данными. Для компаний, где revenue-решения принимаются на основе дашбордов, задержка данных - это задержка решений.
Tableau не умеет напрямую запрашивать Kommo REST API - Tableau работает с базами данных, файлами или специализированными коннекторами. Поэтому архитектура интеграции всегда включает промежуточный слой: данные из Kommo сначала попадают в реляционную базу (PostgreSQL), а Tableau подключается к ней как к live data source. Этот слой также даёт возможность хранить историческую аналитику - то, чего нет при прямом подключении к Kommo.
В этой статье разберём ETL-пайплайн Kommo -> PostgreSQL -> Tableau с инкрементальной синхронизацией и автоматическим обновлением дашборда через Tableau REST API.
Почему нельзя подключиться к Kommo напрямую из Tableau
Tableau поддерживает три класса источников данных: нативные коннекторы (SQL базы, файлы), JDBC/ODBC-коннекторы и Tableau Web Data Connector (WDC). Kommo REST API не подходит ни под один из них без дополнительного слоя.
Ключеной проблема также в пагинации: Kommo API возвращает максимум 250 сделок за запрос с курсорной пагинацией. Tableau не умеет обрабатывать пагинированные API-ответы - это задача ETL-слоя.
PostgreSQL как промежуточный слой даёт три преимущества: полная история изменений сделок (Kommo хранит только текущее состояние), возможность JOIN с другими источниками (финансы, маркетинг), и скорость запросов для Tableau без ограничений API rate limit.
Архитектура
Kommo REST API -> ETL Python (cron 1h) -> PostgreSQL -> Tableau (live connection)
-> Tableau REST API (refresh trigger)
ETL запускается по расписанию, делает инкрементальную синхронизацию новых и обновлённых сделок, upsert в PostgreSQL. Tableau подключён к PostgreSQL напрямую - данные всегда не старше 1 часа.
Реализация ETL
Шаг 1 - создание схемы PostgreSQL:
CREATE TABLE IF NOT EXISTS kommo_leads (
lead_id BIGINT PRIMARY KEY,
name TEXT,
price NUMERIC(12, 2),
status_id INTEGER,
status_name TEXT,
pipeline_id INTEGER,
pipeline_name TEXT,
responsible_id INTEGER,
responsible_name TEXT,
created_at TIMESTAMP,
updated_at TIMESTAMP,
closed_at TIMESTAMP,
-- Кастомные поля (примеры)
cf_industry TEXT,
cf_company_size TEXT,
cf_source TEXT,
-- Мета
synced_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_leads_updated ON kommo_leads(updated_at);
CREATE INDEX IF NOT EXISTS idx_leads_status ON kommo_leads(status_id);
CREATE INDEX IF NOT EXISTS idx_leads_created ON kommo_leads(created_at);
Шаг 2 - инкрементальная синхронизация из Kommo:
import requests, psycopg2, os
from datetime import datetime, timezone
KOMMO_DOMAIN = "yourdomain.kommo.com"
KOMMO_TOKEN = "your_kommo_token"
KOMMO_BASE = f"https://{KOMMO_DOMAIN}/api/v4"
DSN = "postgresql://user:pass@localhost:5432/analytics"
# Маппинг ID статусов на понятные названия
STATUS_NAMES = {
142: "Успешно реализовано",
143: "Закрыто и не реализовано",
}
def fetch_leads_since(since_ts: int) -> list:
"""Fetch all leads updated since timestamp via paginated API."""
hs = requests.Session()
hs.headers["Authorization"] = f"Bearer {KOMMO_TOKEN}"
leads, page = [], 1
while True:
r = hs.get(f"{KOMMO_BASE}/leads", params={
"updated_at[from]": since_ts,
"with": "contacts,loss_reason",
"limit": 250,
"page": page,
})
if r.status_code == 204:
break
batch = r.json().get("_embedded", {}).get("leads", [])
if not batch:
break
leads.extend(batch)
if len(batch) < 250:
break
page += 1
return leads
def get_custom_field(lead: dict, field_code: str) -> str:
for f in lead.get("custom_fields_values") or []:
if f.get("field_code") == field_code:
vals = f.get("values", [])
return str(vals[0]["value"]) if vals else ""
return ""
def upsert_leads(leads: list, conn):
sql = """
INSERT INTO kommo_leads (
lead_id, name, price, status_id, status_name,
pipeline_id, responsible_id,
created_at, updated_at, closed_at,
cf_industry, cf_company_size, cf_source, synced_at
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW())
ON CONFLICT (lead_id) DO UPDATE SET
name = EXCLUDED.name,
price = EXCLUDED.price,
status_id = EXCLUDED.status_id,
status_name = EXCLUDED.status_name,
updated_at = EXCLUDED.updated_at,
closed_at = EXCLUDED.closed_at,
cf_industry = EXCLUDED.cf_industry,
cf_company_size = EXCLUDED.cf_company_size,
synced_at = NOW()
"""
with conn.cursor() as cur:
for lead in leads:
closed = lead.get("closed_at")
cur.execute(sql, (
lead["id"],
lead.get("name", ""),
lead.get("price", 0),
lead.get("status_id"),
STATUS_NAMES.get(lead.get("status_id"), f"stage_{lead.get('status_id')}"),
lead.get("pipeline_id"),
lead.get("responsible_user_id"),
datetime.fromtimestamp(lead["created_at"], tz=timezone.utc),
datetime.fromtimestamp(lead["updated_at"], tz=timezone.utc),
datetime.fromtimestamp(closed, tz=timezone.utc) if closed else None,
get_custom_field(lead, "INDUSTRY"),
get_custom_field(lead, "COMPANY_SIZE"),
get_custom_field(lead, "LEAD_SOURCE"),
))
conn.commit()
def run_sync():
conn = psycopg2.connect(DSN)
# Берём timestamp последней синхронизации
with conn.cursor() as cur:
cur.execute("SELECT COALESCE(EXTRACT(EPOCH FROM MAX(updated_at))::BIGINT, 0) FROM kommo_leads")
since_ts = cur.fetchone()[0]
leads = fetch_leads_since(since_ts)
if leads:
upsert_leads(leads, conn)
print(f"Synced {len(leads)} leads")
conn.close()
if __name__ == "__main__":
run_sync()
Шаг 3 - автоматический refresh Tableau дашборда через REST API:
Tableau REST API позволяет программно запускать Extract Refresh для datasources:
import tableauserverclient as tsc
TABLEAU_SERVER = "https://your-tableau-server.com"
TABLEAU_SITE = "" # пустая строка для Default site
TABLEAU_TOKEN_NAME = "your_token_name"
TABLEAU_TOKEN_VALUE = "your_personal_access_token"
def trigger_tableau_refresh(datasource_name: str = "Kommo Leads"):
"""Trigger Tableau extract refresh after ETL sync."""
tableau_auth = tsc.PersonalAccessTokenAuth(
TABLEAU_TOKEN_NAME, TABLEAU_TOKEN_VALUE, TABLEAU_SITE
)
server = tsc.Server(TABLEAU_SERVER, use_server_version=True)
with server.auth.sign_in(tableau_auth):
# Найти datasource по имени
req_opts = tsc.RequestOptions()
req_opts.filter.add(tsc.Filter(
tsc.RequestOptions.Field.Name,
tsc.RequestOptions.Operator.Equals,
datasource_name
))
all_datasources, _ = server.datasources.get(req_opts)
if all_datasources:
ds = all_datasources[0]
server.datasources.refresh(ds)
print(f"Refresh triggered for: {ds.name}")
Запускаем run_sync() -> trigger_tableau_refresh() по cron каждый час. Tableau автоматически обновляет вью после завершения refresh.
Ключевые метрики для дашборда Tableau
С данными в PostgreSQL типичный sales-дашборд включает:
- Воронка продаж - количество и объём сделок по этапам (bar chart)
- Conversion Rate - соотношение won/total по менеджерам и источникам
- Deal Velocity - среднее время сделки от создания до закрытия
- Revenue Forecast - открытые сделки с вероятностью по pipeline stage
- Leaderboard - топ менеджеров по выручке за период
- Source Attribution - откуда приходят выигранные сделки
Все эти метрики строятся SQL-запросами к PostgreSQL через Tableau calculated fields.
Реальный кейс
Компания с командой 12 менеджеров, 150-200 активных сделок. Еженедельные CSV-экспорты из Kommo в Excel занимали 2-3 часа. Data-решения принимались на данных недельной давности.
После внедрения PostgreSQL + Tableau:
- Данные обновляются ежечасно без участия аналитика
- Revenue forecast строится на актуальных pipeline-данных
- Время на подготовку отчётов сократилось с 2-3 часов до 0
- Tableau дашборд открыт на TV-экране в офисе продаж в режиме реального времени
Время разработки ETL: 2 дня. Tableau дашборд: 1-2 дня.
Для кого актуально
Компании с выстроенной BI-практикой на Tableau и активной воронкой в Kommo. Если ваш Tableau Server уже используется для финансовой аналитики, добавление Kommo как источника данных - логичный шаг. Для небольших команд без Tableau-лицензий проще рассмотреть Kommo + Looker Studio (бесплатный инструмент Google).
Если вы используете кастомные интеграции в Kommo CRM для нескольких систем - PostgreSQL как центральный data warehouse позволяет объединить данные из всех источников в одном месте.
Часто задаваемые вопросы
Подходит ли MySQL вместо PostgreSQL?
Да. Tableau поддерживает нативный коннектор для MySQL. Синтаксис ON CONFLICT в нашем ETL нужно заменить на INSERT ... ON DUPLICATE KEY UPDATE для MySQL. В остальном логика идентична.
Tableau Cloud (Tableau Online) поддерживает PostgreSQL?
Да, если ваш PostgreSQL доступен из интернета. Для локального PostgreSQL нужен Tableau Bridge - агент, который устанавливается в вашей сети и соединяет Tableau Cloud с локальной базой. Tableau Bridge входит в стандартную лицензию Tableau Cloud.
Как часто запускать ETL без превышения Kommo rate limit?
Kommo API: 7 запросов/секунду, 1000 запросов/5 минут на аккаунт. При 250 сделках на запрос и 10 000 активных сделках инкрементальная синхронизация занимает 40-50 запросов. Это безопасно даже при запуске каждые 15 минут.
Как хранить историю изменений для анализа стадий воронки?
Добавьте таблицу kommo_lead_history: при каждом sync записывайте строку с (lead_id, status_id, updated_at) если статус изменился. Это позволяет строить Funnel Analysis в Tableau - сколько сделок прошло через каждый этап за период.
Итог
Kommo + Tableau интеграция через PostgreSQL - это надёжная архитектура для enterprise BI. Схема:
- Cron (каждый час): Python ETL -> Kommo API -> upsert в PostgreSQL
- Tableau подключён к PostgreSQL как live data source
- После ETL: Tableau REST API
datasources.refresh()обновляет дашборд - Инкрементальная синхронизация по
updated_at- нагрузка минимальна
Если вы внедряете Tableau для аналитики продаж и хотите видеть данные Kommo в режиме реального времени - опишите задачу команде Exceltic.dev. Спроектируем схему данных под ваши метрики.