Lightdash - open-source BI инструмент, работающий поверх dbt-моделей. Не хранит данные сам - читает PostgreSQL/BigQuery/Snowflake через dbt метрики. В отличие от Tableau или Looker, не требует BI-лицензии: self-hosted Lightdash бесплатен, данные остаются в вашем хранилище. Для Kommo интеграция работает через ETL: данные сделок выгружаются из Kommo API в PostgreSQL, dbt строит модели, Lightdash рендерит дашборд.
Lightdash API использует Bearer token (Personal Access Token). Основные операции: POST /api/v1/projects/{id}/explores/{name}/runQuery - выполнить запрос к метрике, GET /api/v1/projects - список проектов. Но большинство работы происходит на уровне dbt-моделей, не через Lightdash API напрямую.
dbt-модель - SQL-трансформация, описанная в .yml с метаданными. Lightdash читает dbt metrics: и dimensions: и строит по ним интерфейс для non-SQL пользователей.
Архитектура: Kommo -> PostgreSQL -> dbt -> Lightdash
Kommo API
-> Python ETL (cron каждые 4 часа)
-> PostgreSQL: таблицы kommo_leads, kommo_contacts
-> dbt: модели kommo_deals, metrics (win_rate, avg_deal_size)
-> Lightdash: дашборд продаж
ETL: Kommo -> PostgreSQL
import requests, os, psycopg2
from datetime import datetime, timezone
KOMMO_SUBDOMAIN = os.environ["KOMMO_SUBDOMAIN"]
KOMMO_TOKEN = os.environ["KOMMO_ACCESS_TOKEN"]
PG_DSN = os.environ["PG_DSN"]
KOMMO_BASE = f"https://{KOMMO_SUBDOMAIN}.kommo.com/api/v4"
KOMMO_HDR = {"Authorization": f"Bearer {KOMMO_TOKEN}"}
def fetch_leads(updated_after_ts: int | None = None) -> list[dict]:
params = {"limit": 250, "with": "contacts,custom_fields_values"}
if updated_after_ts:
params["filter[updated_at][from]"] = updated_after_ts
all_leads = []
page = 1
while True:
params["page"] = page
r = requests.get(f"{KOMMO_BASE}/leads", headers=KOMMO_HDR, params=params)
if r.status_code == 204:
break
data = r.json()
items = data.get("_embedded", {}).get("leads", []) or []
if not items:
break
all_leads.extend(items)
page += 1
return all_leads
def upsert_leads_to_pg(leads: list[dict], conn):
cur = conn.cursor()
cur.execute(
"CREATE TABLE IF NOT EXISTS kommo_leads ("
" id BIGINT PRIMARY KEY,"
" name TEXT,"
" status_id INT,"
" pipeline_id INT,"
" responsible_id INT,"
" price NUMERIC,"
" created_at TIMESTAMPTZ,"
" updated_at TIMESTAMPTZ,"
" closed_at TIMESTAMPTZ,"
" loss_reason_id INT"
")"
)
for lead in leads:
cur.execute(
"INSERT INTO kommo_leads"
" (id, name, status_id, pipeline_id, responsible_id,"
" price, created_at, updated_at, closed_at, loss_reason_id)"
" VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
" ON CONFLICT (id) DO UPDATE SET"
" name = EXCLUDED.name,"
" status_id = EXCLUDED.status_id,"
" pipeline_id = EXCLUDED.pipeline_id,"
" responsible_id = EXCLUDED.responsible_id,"
" price = EXCLUDED.price,"
" updated_at = EXCLUDED.updated_at,"
" closed_at = EXCLUDED.closed_at,"
" loss_reason_id = EXCLUDED.loss_reason_id",
(
lead["id"],
lead.get("name"),
lead.get("status_id"),
lead.get("pipeline_id"),
lead.get("responsible_user_id"),
lead.get("price"),
datetime.fromtimestamp(lead["created_at"], tz=timezone.utc) if lead.get("created_at") else None,
datetime.fromtimestamp(lead["updated_at"], tz=timezone.utc) if lead.get("updated_at") else None,
datetime.fromtimestamp(lead["closed_at"], tz=timezone.utc) if lead.get("closed_at") else None,
lead.get("loss_reason_id"),
))
conn.commit()
cur.close()
def run_etl():
conn = psycopg2.connect(PG_DSN)
leads = fetch_leads()
upsert_leads_to_pg(leads, conn)
conn.close()
print(f"ETL done: {len(leads)} leads")
if __name__ == "__main__":
run_etl()
dbt-модели для Lightdash
# models/kommo_deals.yml
version: 2
models:
- name: kommo_deals
description: "Сделки Kommo с метриками для BI"
columns:
- name: id
description: "ID сделки"
- name: name
description: "Название сделки"
- name: status
description: "Статус: open / won / lost"
- name: pipeline_id
description: "ID воронки"
- name: responsible_id
description: "Ответственный менеджер"
- name: price
description: "Сумма сделки USD"
- name: created_date
description: "Дата создания"
- name: closed_date
description: "Дата закрытия"
- name: days_to_close
description: "Длина цикла продаж (дней)"
meta:
joins: []
metrics:
- name: win_rate
label: "Win Rate %"
model: ref('kommo_deals')
description: "Процент выигранных сделок"
type: average
sql: "CASE WHEN status = 'won' THEN 100.0 ELSE 0 END"
timestamp: created_date
time_grains: [month, quarter]
- name: avg_deal_size
label: "Avg Deal Size"
model: ref('kommo_deals')
description: "Средний размер закрытой сделки"
type: average
sql: "price"
filters:
- field: status
operator: "equals"
value: "'won'"
timestamp: closed_date
time_grains: [month, quarter]
- name: deals_created
label: "Deals Created"
model: ref('kommo_deals')
description: "Количество новых сделок"
type: count
sql: "id"
timestamp: created_date
time_grains: [week, month]
-- models/kommo_deals.sql
SELECT
id,
name,
CASE
WHEN status_id IN ({{ var('kommo_won_stages') }}) THEN 'won'
WHEN status_id IN ({{ var('kommo_lost_stages') }}) THEN 'lost'
ELSE 'open'
END AS status,
pipeline_id,
responsible_id,
price,
created_at::date AS created_date,
closed_at::date AS closed_date,
EXTRACT(DAY FROM (COALESCE(closed_at, NOW()) - created_at))::int AS days_to_close
FROM {{ source('raw', 'kommo_leads') }}
dbt_project.yml
name: 'exceltic_analytics'
version: '1.0.0'
vars:
kommo_won_stages: [142, 143] # ID ваших Won-статусов из Kommo
kommo_lost_stages: [144, 145] # ID ваших Lost-статусов
models:
exceltic_analytics:
+materialized: table
Lightdash self-hosted: быстрый старт
git clone https://github.com/lightdash/lightdash
cd lightdash
cp .env.example .env
# Заполнить LIGHTDASH_SECRET, DB credentials, dbt project path
docker-compose up -d
После запуска: Connect Project -> выбрать dbt core project -> указать путь к моделям. Lightdash автоматически парсит metrics: и columns: из .yml и строит Explorer.
Для кого актуально
Tech-компании с командой разработки, где уже используется dbt + PostgreSQL/BigQuery. Head of Sales хочет дашборд без Tableau-лицензии ($70/user/mo). Данные уже в data warehouse - нужно только построить модели и подключить Lightdash.
Аналогичная интеграция для enterprise BI описана для Kommo + Qlik Sense.
Часто задаваемые вопросы
Нужен ли dbt Cloud или достаточно dbt Core?
Достаточно dbt Core (open-source, бесплатно). Lightdash работает с dbt Core через локальный запуск dbt compile. dbt Cloud дает дополнительно: managed orchestration, version history, IDE в браузере - но для интеграции с Lightdash это опционально.
Как часто запускать ETL из Kommo?
Для оперативной отчётности достаточно каждые 4 часа (cron 0 */4 * * *). Kommo API не имеет rate limit для стандартного использования - 7 запросов/секунду. При 1000+ сделках полный sync занимает 2-5 минут. Для инкрементального sync используйте filter[updated_at][from] с timestamp последнего успешного запуска.
Можно ли добавить данные из HubSpot или других CRM в тот же дашборд?
Да. dbt позволяет объединять данные из нескольких sources в одной модели. Добавьте ETL для HubSpot (отдельная таблица hubspot_deals), создайте dbt-модель all_deals с UNION ALL и маппингом полей. Lightdash увидит объединённую модель как единый источник.
Итог
Kommo + Lightdash - open-source BI без Tableau:
- Python ETL cron:
fetch_leads()+ upsert в PostgreSQL сON CONFLICT DO UPDATE - dbt-модели:
CASE status_id IN (...)для won/lost,days_to_closeметрика - Lightdash
metrics:в .yml: win_rate, avg_deal_size, deals_created с time_grains - Self-hosted бесплатно:
docker-compose upза 30 минут - Никаких Tableau/Looker лицензий - данные остаются в вашем warehouse
Если нужна настройка ETL из Kommo в ваш data warehouse или Lightdash дашборд - опишите задачу команде Exceltic.dev.