Kommo + Qlik Sense: enterprise BI-дашборд продаж из данных CRM

Qlik Sense - enterprise BI-платформа с ассоциативным движком данных (Associative Engine). В отличие от Tableau или Power BI, Qlik строит граф связей между всеми данными - аналитик может переходить от любого поля к любому без ручного написания JOIN-запросов. Для компаний, использующих Qlik в корпоративной среде, данные из Kommo нужны как один из источников в общем BI-слое. Кастомная интеграция позволяет синхронизировать сделки, контакты, активности из Kommo в Qlik через Qlik Cloud Data Files API или напрямую через Qlik Engine.

Qlik Cloud API использует Bearer Token (API Key из Qlik Cloud). Данные загружаются как CSV через Data Files API или через Qlik REST Connector (настраивается в Qlik Data Integration). Автоматизировать обновление можно через Qlik Application Automation (low-code) или кастомный скрипт с cron.

Qlik Associative Engine - движок данных Qlik: при выборе значения в любом дашборде автоматически фильтруются все связанные данные. Это ключевое отличие от SQL-подхода Tableau/Power BI.

Архитектура для B2B продаж

Два подхода в зависимости от инфраструктуры:

Подход 1 (рекомендуемый): ETL -> Qlik Data File

Ваш ETL сервис (cron каждые 2 часа)
  -> Kommo API: выгрузить leads, contacts, activities
  -> Собрать CSV с нужными полями
  -> Qlik Cloud API: POST /api/v1/data-files (загрузить CSV)
  -> Qlik приложение использует этот файл как источник

Подход 2: PostgreSQL как промежуточный слой

ETL сервис -> PostgreSQL (upsert сделок)
-> Qlik REST Connector -> PostgreSQL
-> Qlik приложение на основе PostgreSQL

Подход 1 проще и не требует PostgreSQL. Подход 2 надёжнее при объёме >50 000 сделок.

Реализация: ETL из Kommo + загрузка в Qlik

import requests, os, io, csv, time
from datetime import datetime, timezone

KOMMO_SUBDOMAIN = os.environ["KOMMO_SUBDOMAIN"]
KOMMO_TOKEN     = os.environ["KOMMO_ACCESS_TOKEN"]
QLIK_TENANT     = os.environ["QLIK_TENANT"]      # mycompany.eu.qlikcloud.com
QLIK_API_KEY    = os.environ["QLIK_API_KEY"]
QLIK_FILE_NAME  = os.environ.get("QLIK_FILE_NAME", "kommo_deals.csv")
QLIK_SPACE_ID   = os.environ.get("QLIK_SPACE_ID", "")  # personal space or team space

KOMMO_BASE = f"https://{KOMMO_SUBDOMAIN}.kommo.com/api/v4"
KOMMO_HDR  = {"Authorization": f"Bearer {KOMMO_TOKEN}"}

QLIK_BASE  = f"https://{QLIK_TENANT}"
QLIK_HDR   = {"Authorization": f"Bearer {QLIK_API_KEY}"}

def fetch_kommo_leads(page: int = 1, limit: int = 250) -> list[dict]:
    all_leads = []
    while True:
        r = requests.get(
            f"{KOMMO_BASE}/leads",
            headers=KOMMO_HDR,
            params={
                "with":   "contacts,custom_fields_values",
                "limit":  limit,
                "page":   page,
                "filter[updated_at][from]": int(time.time()) - 7 * 86400,  # последние 7 дней
            },
        )
        if r.status_code == 204:
            break
        data  = r.json()
        leads = data.get("_embedded", {}).get("leads", [])
        all_leads.extend(leads)
        if len(leads) < limit:
            break
        page += 1
    return all_leads

def leads_to_csv(leads: list[dict]) -> bytes:
    output = io.StringIO()
    writer = csv.writer(output)
    writer.writerow([
        "lead_id", "lead_name", "status_id", "pipeline_id",
        "price", "responsible_user_id", "created_at", "updated_at",
        "contact_name", "contact_email",
    ])
    for lead in leads:
        contacts     = lead.get("_embedded", {}).get("contacts", [{}])
        contact_name = contacts[0].get("name", "") if contacts else ""

        email = ""
        for cf in lead.get("custom_fields_values", []) or []:
            if cf.get("field_code") == "EMAIL":
                vals = cf.get("values", [])
                if vals:
                    email = vals[0].get("value", "")
                    break

        writer.writerow([
            lead.get("id"),
            lead.get("name"),
            lead.get("status_id"),
            lead.get("pipeline_id"),
            lead.get("price", 0),
            lead.get("responsible_user_id"),
            datetime.fromtimestamp(lead.get("created_at", 0), tz=timezone.utc).isoformat(),
            datetime.fromtimestamp(lead.get("updated_at", 0), tz=timezone.utc).isoformat(),
            contact_name,
            email,
        ])
    return output.getvalue().encode("utf-8")

def upload_to_qlik(csv_bytes: bytes, file_name: str) -> str:
    # Сначала проверить существует ли файл
    params = {"name": file_name}
    if QLIK_SPACE_ID:
        params["spaceId"] = QLIK_SPACE_ID

    r_list = requests.get(f"{QLIK_BASE}/api/v1/data-files", headers=QLIK_HDR, params=params)
    existing = r_list.json().get("data", [])

    if existing:
        # Обновить существующий файл
        file_id = existing[0]["id"]
        r = requests.put(
            f"{QLIK_BASE}/api/v1/data-files/{file_id}",
            headers=QLIK_HDR,
            files={"File": (file_name, csv_bytes, "text/csv")},
        )
    else:
        # Создать новый
        data = {"name": file_name}
        if QLIK_SPACE_ID:
            data["spaceId"] = QLIK_SPACE_ID
        r = requests.post(
            f"{QLIK_BASE}/api/v1/data-files",
            headers=QLIK_HDR,
            files={"File": (file_name, csv_bytes, "text/csv")},
            data=data,
        )

    r.raise_for_status()
    return r.json().get("id", "")

def reload_qlik_app(app_id: str):
    r = requests.post(
        f"{QLIK_BASE}/api/v1/reloads",
        headers={**QLIK_HDR, "Content-Type": "application/json"},
        json={"appId": app_id, "partial": False},
    )
    return r.json().get("id", "")

def run_etl():
    leads    = fetch_kommo_leads()
    csv_data = leads_to_csv(leads)
    file_id  = upload_to_qlik(csv_data, QLIK_FILE_NAME)
    print(f"Uploaded {len(leads)} deals to Qlik. File ID: {file_id}")
    return file_id

if __name__ == "__main__":
    run_etl()

Расписание: cron каждые 2 часа

0 */2 * * * cd /app && python etl_kommo_qlik.py >> /var/log/kommo_qlik.log 2>&1

Или через Qlik Application Automation: создать Automation с триггером по расписанию, который вызывает ваш ETL-endpoint через HTTP action.

Ключевые метрики для Qlik дашборда продаж

В Qlik Load Script подключить CSV и создать следующие Measures:

  • Win Rate: Count({<status_id={'won'}>} lead_id) / Count(lead_id) * 100
  • Average Deal Size: Avg({<status_id={'won'}>} price)
  • Pipeline Velocity: Sum(price) / Count(distinct responsible_user_id) / Days
  • Stage Conversion: Count по каждому status_id с сравнением периодов
  • Revenue Forecast: Sum price by pipeline_id для открытых сделок

Реальный кейс

Компания с 3 пайплайнами в Kommo, 200+ сделок в месяц. Qlik Sense уже использовался для финансовой аналитики. Данные из Kommo нужны рядом с финансовыми данными в одном дашборде. Кастомный ETL загружает данные каждые 2 часа. CFO видит Revenue Forecast рядом с P&L в едином Qlik приложении.

Для кого актуально

Enterprise-компании, уже использующие Qlik Sense как корпоративный BI-стандарт. Если Qlik лицензирован на уровне компании, добавление данных из Kommo - минимальная разработка. Для новых компаний без BI-платформы - Tableau или Metabase проще в старте.

Аналогичный подход описан для Kommo + Tableau и Kommo + Zoho Analytics.

Часто задаваемые вопросы

Где получить Qlik Cloud API Key?

Qlik Cloud -> Management Console -> API Keys -> Create New Key. Ключ создаётся с правами вашего пользователя. Для production используйте сервисный аккаунт с минимальными правами: Data Files (read/write), Apps (read, reload).

Qlik On-Premise vs Qlik Cloud: разные API?

Qlik Cloud использует REST API (описано выше). Qlik Enterprise on-premise использует Engine API (WebSocket-based, QRPC) - принципиально другой подход. Если у вас on-premise Qlik Sense, взаимодействие с данными происходит через QVD файлы или REST Connector, настраиваемый в Data Load Editor.

Как обновить только изменившиеся данные (incremental load)?

Фильтруйте Kommo по updated_at: передавайте timestamp последней синхронизации в filter[updated_at][from]. Хранить timestamp в файле или Redis. В Qlik Load Script: LOAD * FROM ... WHERE lead_id NOT EXISTS в старом QVD + конкатенировать. Полная перезагрузка проще, incremental load нужен только при >100 000 сделок.

Итог

Kommo + Qlik Sense - enterprise BI из данных CRM:

  • ETL: Kommo API with=contacts,custom_fields_values -> CSV -> Qlik Data Files
  • POST /api/v1/data-files (создание) / PUT /api/v1/data-files/{id} (обновление)
  • Bearer API Key, Space ID для командного пространства
  • Reload приложения: POST /api/v1/reloads после загрузки файла
  • Cron каждые 2 часа для актуальных данных

Если ваша компания на Qlik и нужна интеграция с Kommo - опишите задачу команде Exceltic.dev.

Ещё статьи

Все →