Kommo + Sigma Computing: BI-дашборд продаж на основе CRM-данных

Sigma Computing - cloud BI-платформа с интерфейсом в стиле spreadsheet поверх data warehouse. Для B2B команд, которые ведут продажи в Kommo и хотят полноценную аналитику - conversion rate по этапам, pipeline velocity, когортный анализ - Sigma предоставляет возможности, которых нет в стандартной CRM-аналитике.

Интеграция строится через ETL: Kommo API -> Python-скрипт -> PostgreSQL -> Sigma подключается к PostgreSQL как data source. Sigma API (OAuth2 client_credentials) используется для программного управления embed-ссылками и Input Tables. Для большинства задач достаточно настроить ETL один раз и затем работать в интерфейсе Sigma.

Sigma Computing - BI-инструмент, который работает поверх cloud data warehouse (Snowflake, BigQuery, Redshift, PostgreSQL). Отличается от Metabase и Superset тем, что позволяет аналитикам работать в spreadsheet-интерфейсе без SQL, при этом сохраняя полную гибкость data model.

Почему нативная CRM-аналитика не достаточна

Kommo Analytics показывает стандартные метрики: количество сделок по этапам, воронку, активность менеджеров. Но для борд-отчётов и стратегических решений нужно больше: когортный анализ (какие клиенты из Q3 2025 конвертировались в Q1 2026), multi-touch attribution, cross-pipeline корреляции, сравнение периодов с динамикой.

Стандартный подход - экспорт CSV из Kommo и загрузка в Google Sheets. Это работает для разовых отчётов, но не для живого дашборда. При каждом обновлении - новый экспорт, новый импорт.

Архитектура

Kommo API (deals, contacts, pipelines, events)
  -> Python ETL-скрипт (daily cron / per webhook)
  -> PostgreSQL (kommo_leads, kommo_contacts, kommo_events)

Sigma Computing
  -> Connected Dataset: подключён к PostgreSQL
  -> Workbook с дашбордом продаж
  -> (опционально) Embed API -> iframe в внутреннем портале

ETL: Kommo -> PostgreSQL

import requests, os, psycopg2
from datetime import datetime

KOMMO_SUBDOMAIN = os.environ["KOMMO_SUBDOMAIN"]
KOMMO_TOKEN     = os.environ["KOMMO_ACCESS_TOKEN"]
DB_URL          = os.environ["DATABASE_URL"]  # postgres://user:pass@host/db

KOMMO_BASE = f"https://{KOMMO_SUBDOMAIN}.kommo.com/api/v4"
KOMMO_HDR  = {"Authorization": f"Bearer {KOMMO_TOKEN}"}

def fetch_leads(page: int = 1, limit: int = 250) -> list:
    r = requests.get(
        f"{KOMMO_BASE}/leads",
        headers=KOMMO_HDR,
        params={
            "page":  page,
            "limit": limit,
            "with":  "contacts,pipeline,custom_fields_values",
        },
    )
    if r.status_code == 204:
        return []
    r.raise_for_status()
    return r.json().get("_embedded", {}).get("leads", [])

def fetch_all_leads() -> list:
    all_leads = []
    page = 1
    while True:
        batch = fetch_leads(page)
        if not batch:
            break
        all_leads.extend(batch)
        page += 1
    return all_leads

def upsert_leads(conn, leads: list):
    with conn.cursor() as cur:
        cur.execute(
            "CREATE TABLE IF NOT EXISTS kommo_leads ("
            "  id           BIGINT PRIMARY KEY,"
            "  name         TEXT,"
            "  status_id    INT,"
            "  pipeline_id  INT,"
            "  price        BIGINT,"
            "  created_at   TIMESTAMPTZ,"
            "  closed_at    TIMESTAMPTZ,"
            "  updated_at   TIMESTAMPTZ,"
            "  responsible  INT,"
            "  loss_reason  TEXT"
            ")"
        )
        for lead in leads:
            cur.execute(
                "INSERT INTO kommo_leads"
                "  (id, name, status_id, pipeline_id, price,"
                "   created_at, closed_at, updated_at, responsible)"
                " VALUES (%s,%s,%s,%s,%s,"
                "   to_timestamp(%s), to_timestamp(%s), to_timestamp(%s), %s)"
                " ON CONFLICT (id) DO UPDATE SET"
                "  name        = EXCLUDED.name,"
                "  status_id   = EXCLUDED.status_id,"
                "  price       = EXCLUDED.price,"
                "  closed_at   = EXCLUDED.closed_at,"
                "  updated_at  = EXCLUDED.updated_at",
                (
                    lead["id"],
                    lead.get("name"),
                    lead.get("status_id"),
                    lead.get("pipeline_id"),
                    lead.get("price", 0),
                    lead.get("created_at"),
                    lead.get("closed_at"),
                    lead.get("updated_at"),
                    lead.get("responsible_user_id"),
                ),
            )
    conn.commit()

def run_etl():
    leads = fetch_all_leads()
    print(f"Fetched {len(leads)} leads")
    conn = psycopg2.connect(DB_URL)
    upsert_leads(conn, leads)
    conn.close()
    print(f"ETL done: {datetime.now()}")

if __name__ == "__main__":
    run_etl()

Подключение Sigma к PostgreSQL

В Sigma Admin Portal -> Connections -> New Connection:

  • Type: PostgreSQL
  • Host, Port, Database, User, Password - из вашего PostgreSQL
  • После подключения: Sigma автоматически обнаруживает таблицы

Из таблицы kommo_leads создайте Dataset в Sigma, добавьте вычисляемые колонки:

  • win_rate: CASE WHEN status_id = {won_id} THEN 1 ELSE 0 END
  • deal_duration_days: DATEDIFF('day', created_at, COALESCE(closed_at, NOW()))
  • pipeline_stage: JOIN с таблицей kommo_statuses (если экспортируете)

Sigma API: Embed для внутреннего портала

import time, hmac, hashlib, urllib.parse, os

SIGMA_CLIENT_ID     = os.environ["SIGMA_CLIENT_ID"]
SIGMA_CLIENT_SECRET = os.environ["SIGMA_CLIENT_SECRET"]
SIGMA_EMBED_SECRET  = os.environ["SIGMA_EMBED_SECRET"]

def get_sigma_token() -> str:
    r = requests.post(
        "https://aws-api.sigmacomputing.com/v2/auth/token",
        data={
            "grant_type":    "client_credentials",
            "client_id":     SIGMA_CLIENT_ID,
            "client_secret": SIGMA_CLIENT_SECRET,
        },
    )
    r.raise_for_status()
    return r.json()["access_token"]

def generate_embed_url(workbook_id: str,
                        user_email: str, user_team: str) -> str:
    nonce   = os.urandom(16).hex()
    ts      = str(int(time.time()))
    params  = {
        "workbookId":  workbook_id,
        "email":       user_email,
        "team":        user_team,
        "nonce":       nonce,
        "time":        ts,
    }
    query   = urllib.parse.urlencode(params)
    sig_input = f"/embed/explore?{query}"
    signature = hmac.new(
        SIGMA_EMBED_SECRET.encode(), sig_input.encode(), hashlib.sha256
    ).hexdigest()
    return f"https://app.sigmacomputing.com/embed/explore?{query}&signature={signature}"

Embed URL можно передавать в iframe на внутреннем портале или dashboard-странице.

Реальный кейс

Компания с 4 pipeline в Kommo (разные продукты) и командой 25 человек. До интеграции: board-отчёт готовил аналитик вручную каждые 2 недели - 4-5 часов на выгрузку, чистку и построение charts в Google Sheets. После: ETL запускается по cron ежедневно, Sigma-дашборд обновляется в реальном времени. Board получает живой отчёт без участия аналитика.

Для кого актуально

B2B компании с 3+ pipeline в Kommo и потребностью в кросс-pipeline аналитике. Sigma особенно ценится в компаниях, где аналитики привыкли к spreadsheet-интерфейсу, но нужны возможности BI: custom metrics, drill-down, cohort analysis. Если у вас уже есть PostgreSQL или data warehouse - добавить Sigma как BI-layer минимально инвазивно.

Другие аналитические интеграции: Kommo + Hex (Python notebooks), Kommo + Lightdash (dbt + BI).

Часто задаваемые вопросы

Работает ли Sigma с Snowflake вместо PostgreSQL?

Да - Sigma изначально создавался для Snowflake. PostgreSQL - один из многих поддерживаемых коннекторов. Для enterprise-объёмов (10M+ записей) рекомендуем Snowflake или BigQuery - PostgreSQL начинает тормозить на больших аналитических запросах.

Как часто нужно запускать ETL?

Зависит от потребностей. Для ежедневного board-отчёта - cron раз в ночь. Для живого sales dashboard - webhook-триггер при каждом изменении сделки в Kommo (leads.update webhook) плюс incremental upsert по updated_at.

Поддерживает ли Sigma Row Level Security для разных менеджеров?

Да. Sigma поддерживает User Attributes: при создании embed URL передаёте userAttributes: {"region": "EU"}, и Dataset filter автоматически ограничивает данные по этому атрибуту. Каждый менеджер видит только свои сделки.

Чем Sigma отличается от Metabase для этой задачи?

Metabase проще в настройке и дешевле. Sigma сильнее в кастомных вычислениях, spreadsheet-интерфейсе и работе с большими объёмами данных. Для команды с SQL-аналитиками - Sigma даёт больше гибкости. Для команды без технических аналитиков - Metabase с его Question builder проще в использовании.

Итог

Kommo + Sigma Computing - BI-дашборд из CRM-данных:

  • ETL: Kommo API -> Python -> PostgreSQL (upsert по id)
  • Sigma: Connected Dataset поверх PostgreSQL -> workbook с метриками
  • Sigma API: OAuth2 client_credentials, embed URL через HMAC-подпись
  • Cron daily или webhook-triggered incremental sync
  • Row Level Security через User Attributes для разных ролей

Если ваша команда хочет построить аналитику продаж поверх Kommo и Sigma Computing - опишите задачу команде Exceltic.dev.

Ещё статьи

Все →