Kommo + Lightdash: BI-дашборд продаж на open-source стеке без Tableau

Lightdash - open-source BI инструмент, работающий поверх dbt-моделей. Не хранит данные сам - читает PostgreSQL/BigQuery/Snowflake через dbt метрики. В отличие от Tableau или Looker, не требует BI-лицензии: self-hosted Lightdash бесплатен, данные остаются в вашем хранилище. Для Kommo интеграция работает через ETL: данные сделок выгружаются из Kommo API в PostgreSQL, dbt строит модели, Lightdash рендерит дашборд.

Lightdash API использует Bearer token (Personal Access Token). Основные операции: POST /api/v1/projects/{id}/explores/{name}/runQuery - выполнить запрос к метрике, GET /api/v1/projects - список проектов. Но большинство работы происходит на уровне dbt-моделей, не через Lightdash API напрямую.

dbt-модель - SQL-трансформация, описанная в .yml с метаданными. Lightdash читает dbt metrics: и dimensions: и строит по ним интерфейс для non-SQL пользователей.

Архитектура: Kommo -> PostgreSQL -> dbt -> Lightdash

Kommo API
  -> Python ETL (cron каждые 4 часа)
  -> PostgreSQL: таблицы kommo_leads, kommo_contacts
  -> dbt: модели kommo_deals, metrics (win_rate, avg_deal_size)
  -> Lightdash: дашборд продаж

ETL: Kommo -> PostgreSQL

import requests, os, psycopg2
from datetime import datetime, timezone

KOMMO_SUBDOMAIN = os.environ["KOMMO_SUBDOMAIN"]
KOMMO_TOKEN     = os.environ["KOMMO_ACCESS_TOKEN"]
PG_DSN          = os.environ["PG_DSN"]

KOMMO_BASE = f"https://{KOMMO_SUBDOMAIN}.kommo.com/api/v4"
KOMMO_HDR  = {"Authorization": f"Bearer {KOMMO_TOKEN}"}

def fetch_leads(updated_after_ts: int | None = None) -> list[dict]:
    params = {"limit": 250, "with": "contacts,custom_fields_values"}
    if updated_after_ts:
        params["filter[updated_at][from]"] = updated_after_ts
    all_leads = []
    page = 1
    while True:
        params["page"] = page
        r = requests.get(f"{KOMMO_BASE}/leads", headers=KOMMO_HDR, params=params)
        if r.status_code == 204:
            break
        data = r.json()
        items = data.get("_embedded", {}).get("leads", []) or []
        if not items:
            break
        all_leads.extend(items)
        page += 1
    return all_leads

def upsert_leads_to_pg(leads: list[dict], conn):
    cur = conn.cursor()
    cur.execute(
        "CREATE TABLE IF NOT EXISTS kommo_leads ("
        "    id             BIGINT PRIMARY KEY,"
        "    name           TEXT,"
        "    status_id      INT,"
        "    pipeline_id    INT,"
        "    responsible_id INT,"
        "    price          NUMERIC,"
        "    created_at     TIMESTAMPTZ,"
        "    updated_at     TIMESTAMPTZ,"
        "    closed_at      TIMESTAMPTZ,"
        "    loss_reason_id INT"
        ")"
    )
    for lead in leads:
        cur.execute(
            "INSERT INTO kommo_leads"
            "    (id, name, status_id, pipeline_id, responsible_id,"
            "     price, created_at, updated_at, closed_at, loss_reason_id)"
            " VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
            " ON CONFLICT (id) DO UPDATE SET"
            "    name           = EXCLUDED.name,"
            "    status_id      = EXCLUDED.status_id,"
            "    pipeline_id    = EXCLUDED.pipeline_id,"
            "    responsible_id = EXCLUDED.responsible_id,"
            "    price          = EXCLUDED.price,"
            "    updated_at     = EXCLUDED.updated_at,"
            "    closed_at      = EXCLUDED.closed_at,"
            "    loss_reason_id = EXCLUDED.loss_reason_id",
            (
            lead["id"],
            lead.get("name"),
            lead.get("status_id"),
            lead.get("pipeline_id"),
            lead.get("responsible_user_id"),
            lead.get("price"),
            datetime.fromtimestamp(lead["created_at"], tz=timezone.utc) if lead.get("created_at") else None,
            datetime.fromtimestamp(lead["updated_at"], tz=timezone.utc) if lead.get("updated_at") else None,
            datetime.fromtimestamp(lead["closed_at"],  tz=timezone.utc) if lead.get("closed_at")  else None,
            lead.get("loss_reason_id"),
        ))
    conn.commit()
    cur.close()

def run_etl():
    conn = psycopg2.connect(PG_DSN)
    leads = fetch_leads()
    upsert_leads_to_pg(leads, conn)
    conn.close()
    print(f"ETL done: {len(leads)} leads")

if __name__ == "__main__":
    run_etl()

dbt-модели для Lightdash

# models/kommo_deals.yml
version: 2

models:
  - name: kommo_deals
    description: "Сделки Kommo с метриками для BI"
    columns:
      - name: id
        description: "ID сделки"
      - name: name
        description: "Название сделки"
      - name: status
        description: "Статус: open / won / lost"
      - name: pipeline_id
        description: "ID воронки"
      - name: responsible_id
        description: "Ответственный менеджер"
      - name: price
        description: "Сумма сделки USD"
      - name: created_date
        description: "Дата создания"
      - name: closed_date
        description: "Дата закрытия"
      - name: days_to_close
        description: "Длина цикла продаж (дней)"

    meta:
      joins: []

metrics:
  - name: win_rate
    label: "Win Rate %"
    model: ref('kommo_deals')
    description: "Процент выигранных сделок"
    type: average
    sql: "CASE WHEN status = 'won' THEN 100.0 ELSE 0 END"
    timestamp: created_date
    time_grains: [month, quarter]

  - name: avg_deal_size
    label: "Avg Deal Size"
    model: ref('kommo_deals')
    description: "Средний размер закрытой сделки"
    type: average
    sql: "price"
    filters:
      - field: status
        operator: "equals"
        value: "'won'"
    timestamp: closed_date
    time_grains: [month, quarter]

  - name: deals_created
    label: "Deals Created"
    model: ref('kommo_deals')
    description: "Количество новых сделок"
    type: count
    sql: "id"
    timestamp: created_date
    time_grains: [week, month]
-- models/kommo_deals.sql
SELECT
    id,
    name,
    CASE
        WHEN status_id IN ({{ var('kommo_won_stages') }})  THEN 'won'
        WHEN status_id IN ({{ var('kommo_lost_stages') }}) THEN 'lost'
        ELSE 'open'
    END AS status,
    pipeline_id,
    responsible_id,
    price,
    created_at::date AS created_date,
    closed_at::date  AS closed_date,
    EXTRACT(DAY FROM (COALESCE(closed_at, NOW()) - created_at))::int AS days_to_close
FROM {{ source('raw', 'kommo_leads') }}

dbt_project.yml

name: 'exceltic_analytics'
version: '1.0.0'

vars:
  kommo_won_stages:  [142, 143]  # ID ваших Won-статусов из Kommo
  kommo_lost_stages: [144, 145]  # ID ваших Lost-статусов

models:
  exceltic_analytics:
    +materialized: table

Lightdash self-hosted: быстрый старт

git clone https://github.com/lightdash/lightdash
cd lightdash
cp .env.example .env
# Заполнить LIGHTDASH_SECRET, DB credentials, dbt project path
docker-compose up -d

После запуска: Connect Project -> выбрать dbt core project -> указать путь к моделям. Lightdash автоматически парсит metrics: и columns: из .yml и строит Explorer.

Для кого актуально

Tech-компании с командой разработки, где уже используется dbt + PostgreSQL/BigQuery. Head of Sales хочет дашборд без Tableau-лицензии ($70/user/mo). Данные уже в data warehouse - нужно только построить модели и подключить Lightdash.

Аналогичная интеграция для enterprise BI описана для Kommo + Qlik Sense.

Часто задаваемые вопросы

Нужен ли dbt Cloud или достаточно dbt Core?

Достаточно dbt Core (open-source, бесплатно). Lightdash работает с dbt Core через локальный запуск dbt compile. dbt Cloud дает дополнительно: managed orchestration, version history, IDE в браузере - но для интеграции с Lightdash это опционально.

Как часто запускать ETL из Kommo?

Для оперативной отчётности достаточно каждые 4 часа (cron 0 */4 * * *). Kommo API не имеет rate limit для стандартного использования - 7 запросов/секунду. При 1000+ сделках полный sync занимает 2-5 минут. Для инкрементального sync используйте filter[updated_at][from] с timestamp последнего успешного запуска.

Можно ли добавить данные из HubSpot или других CRM в тот же дашборд?

Да. dbt позволяет объединять данные из нескольких sources в одной модели. Добавьте ETL для HubSpot (отдельная таблица hubspot_deals), создайте dbt-модель all_deals с UNION ALL и маппингом полей. Lightdash увидит объединённую модель как единый источник.

Итог

Kommo + Lightdash - open-source BI без Tableau:

  • Python ETL cron: fetch_leads() + upsert в PostgreSQL с ON CONFLICT DO UPDATE
  • dbt-модели: CASE status_id IN (...) для won/lost, days_to_close метрика
  • Lightdash metrics: в .yml: win_rate, avg_deal_size, deals_created с time_grains
  • Self-hosted бесплатно: docker-compose up за 30 минут
  • Никаких Tableau/Looker лицензий - данные остаются в вашем warehouse

Если нужна настройка ETL из Kommo в ваш data warehouse или Lightdash дашборд - опишите задачу команде Exceltic.dev.

Ещё статьи

Все →