Kommo + Zoho Analytics: BI-дашборд продаж для компаний в Zoho-экосистеме

Zoho Analytics - облачный BI-инструмент из Zoho-экосистемы: создаёт интерактивные отчёты и дашборды, поддерживает SQL-запросы, автоматически обновляет данные. Kommo - CRM с данными воронки продаж. Компании, использующие Zoho Books, Zoho Projects или другие Zoho-инструменты, часто выбирают Zoho Analytics как централизованный BI - можно объединить данные из CRM, бухгалтерии и задач в одном дашборде.

Базовая проблема: Zoho Analytics не имеет нативного коннектора для Kommo (это не Zoho CRM). Данные из Kommo нужно загружать через Import API - аналогично PostgreSQL-подходу для Tableau, но с HTTP вместо JDBC.

Zoho Analytics Import API позволяет загружать данные через POST /api/{email}/{dbName}/{tableName} с автоматическим созданием таблиц при первой загрузке. Авторизация через OAuth 2.0 (Client Credentials для server-to-server).

Архитектура

Kommo REST API (инкрементальная выборка по updated_at)
  -> Python ETL (cron каждые 2 часа)
  -> Zoho Analytics Import API (Bulk Import или Row API)
     Table: kommo_leads

Zoho Analytics
  -> Отчёты на базе kommo_leads
  -> Дашборды: воронка, KPI, прогноз выручки

Реализация OAuth 2.0 для Zoho Analytics

import requests, os, json, time
from pathlib import Path

ZA_CLIENT_ID     = os.environ["ZOHO_CLIENT_ID"]
ZA_CLIENT_SECRET = os.environ["ZOHO_CLIENT_SECRET"]
ZA_REFRESH_TOKEN = os.environ["ZOHO_REFRESH_TOKEN"]
ZA_ORG_EMAIL     = os.environ["ZOHO_ORG_EMAIL"]
ZA_DB_NAME       = os.environ["ZOHO_DB_NAME"]     # имя базы в Zoho Analytics
ZA_TABLE_NAME    = "kommo_leads"

TOKEN_CACHE = Path("/tmp/zoho_token.json")

def get_zoho_token() -> str:
    # Проверяем кэшированный токен
    if TOKEN_CACHE.exists():
        cached = json.loads(TOKEN_CACHE.read_text())
        if cached.get("expires_at", 0) > time.time() + 60:
            return cached["access_token"]

    # Refresh
    r = requests.post(
        "https://accounts.zoho.com/oauth/v2/token",
        data={
            "client_id":     ZA_CLIENT_ID,
            "client_secret": ZA_CLIENT_SECRET,
            "refresh_token": ZA_REFRESH_TOKEN,
            "grant_type":    "refresh_token",
        },
    )
    r.raise_for_status()
    data = r.json()

    token_data = {
        "access_token": data["access_token"],
        "expires_at":   time.time() + data.get("expires_in", 3600) - 30,
    }
    TOKEN_CACHE.write_text(json.dumps(token_data))
    return token_data["access_token"]

ETL: Kommo -> Zoho Analytics

KOMMO_DOMAIN = os.environ["KOMMO_DOMAIN"]
KOMMO_TOKEN  = os.environ["KOMMO_TOKEN"]
KOMMO_BASE   = f"https://{KOMMO_DOMAIN}/api/v4"
KOMMO_HDR    = {"Authorization": f"Bearer {KOMMO_TOKEN}"}

def fetch_leads_since(since_ts: int) -> list:
    leads, page = [], 1
    session = requests.Session()
    session.headers.update(KOMMO_HDR)

    while True:
        r = session.get(f"{KOMMO_BASE}/leads", params={
            "updated_at[from]": since_ts,
            "limit": 250,
            "page":  page,
        })
        if r.status_code == 204:
            break
        batch = r.json().get("_embedded", {}).get("leads", [])
        if not batch:
            break
        leads.extend(batch)
        if len(batch) < 250:
            break
        page += 1

    return leads

def leads_to_rows(leads: list) -> list[dict]:
    rows = []
    for lead in leads:
        closed = lead.get("closed_at")
        rows.append({
            "lead_id":           lead["id"],
            "name":              lead.get("name", ""),
            "price":             lead.get("price", 0),
            "status_id":         lead.get("status_id"),
            "pipeline_id":       lead.get("pipeline_id"),
            "responsible_id":    lead.get("responsible_user_id"),
            "created_at":        _ts(lead.get("created_at")),
            "updated_at":        _ts(lead.get("updated_at")),
            "closed_at":         _ts(closed) if closed else "",
        })
    return rows

def _ts(epoch: int | None) -> str:
    if not epoch:
        return ""
    from datetime import datetime, timezone
    return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

def import_to_zoho(rows: list[dict]):
    if not rows:
        return

    token = get_zoho_token()

    # Формат: JSON array
    import_data = json.dumps(rows)

    url = f"https://analyticsapi.zoho.com/api/{ZA_ORG_EMAIL}/{ZA_DB_NAME}/{ZA_TABLE_NAME}"
    r = requests.post(
        url,
        headers={"Authorization": f"Zoho-oauthtoken {token}"},
        params={
            "ZOHO_ACTION":      "IMPORT",
            "ZOHO_OUTPUT_FORMAT": "JSON",
            "ZOHO_ERROR_FORMAT": "JSON",
            "ZOHO_API_VERSION": "1.0",
            "ZOHO_IMPORT_TYPE": "UPDATEADD",   # Upsert: обновить или добавить
            "ZOHO_AUTO_IDENTIFY": "TRUE",      # Авто-определение типов колонок
        },
        data={"ZOHO_DO_IN_BACKGROUND": "false", "ZOHO_IMPORT_DATA": import_data},
    )
    result = r.json()
    summary = result.get("response", {}).get("result", {}).get("importSummary", {})
    print(f"Imported: {summary.get('totalRecordCount', 0)} records")

def run_etl():
    # Получить timestamp последней синхронизации из файла
    ts_file = Path("/tmp/kommo_last_sync.txt")
    since = int(ts_file.read_text()) if ts_file.exists() else 0

    leads = fetch_leads_since(since)
    if leads:
        rows = leads_to_rows(leads)
        import_to_zoho(rows)

        # Обновить timestamp
        max_ts = max(l.get("updated_at", 0) for l in leads)
        ts_file.write_text(str(max_ts))
        print(f"Synced {len(leads)} leads")

if __name__ == "__main__":
    run_etl()

Дашборды в Zoho Analytics

После первой загрузки данных создайте отчёты в Zoho Analytics:

  • Воронка продаж: Count leads by status_id -> Bar chart
  • Выручка по менеджерам: SUM(price) GROUP BY responsible_id -> Pie chart
  • Динамика закрытий: COUNT(*) WHERE status_id=142 GROUP BY date(closed_at) -> Line chart
  • Average Deal Cycle: AVG(closed_at - created_at) WHERE status_id=142 -> KPI widget

Zoho Analytics поддерживает SQL-запросы напрямую - все стандартные агрегации и JOIN с другими таблицами (например, с данными из Zoho Books).

Реальный кейс

Компания, использующая Zoho Books для бухгалтерии и Kommo для продаж. Хотели видеть выручку из Zoho Books и статусы сделок из Kommo в одном дашборде. Решение: ETL Kommo -> Zoho Analytics, плюс нативный коннектор Zoho Books -> Zoho Analytics. Дашборд обновляется каждые 2 часа. Директор видит конверсию воронки и выставленные счета в одном месте без переключения систем.

Для кого актуально

Компании, уже использующие другие продукты Zoho (Books, Projects, Desk, Campaigns). Если Zoho Analytics уже лицензирован для других данных - добавить Kommo как источник не требует дополнительных затрат, только разработки ETL.

Аналогичный подход для Tableau разобран в Kommo + Tableau: BI-дашборд продаж.

Часто задаваемые вопросы

В чём отличие UPDATEADD от других режимов импорта?

Zoho Analytics поддерживает несколько режимов: APPEND (только добавление), TRUNCATEADD (очистить и добавить), UPDATEADD (обновить существующие записи по ключу или добавить новые). Для инкрементальной синхронизации нужен UPDATEADD с ключом lead_id.

Как настроить автоматический refresh без cron?

Zoho Analytics поддерживает Data Sync - настроить расписание прямо в интерфейсе для SQL-источников. Для Kommo нужен Data Bridge (on-premise компонент) или внешний ETL. Простейший вариант - GitHub Actions или любой scheduler.

Есть ли лимиты на Zoho Analytics Import API?

Да: 10 запросов в минуту для Import API, максимум 25 000 строк за один импорт. При большом объёме данных разбивайте импорт на батчи по 5 000-10 000 строк.

Итог

Kommo + Zoho Analytics:

  • OAuth 2.0 Client Credentials + Refresh Token для авторизации
  • Kommo paginated API (updated_at[from]) -> инкрементальная выборка
  • Import API с ZOHO_IMPORT_TYPE=UPDATEADD -> upsert по lead_id
  • Дашборды в Zoho Analytics с SQL-запросами поверх данных Kommo

Если вы используете Zoho-экосистему и хотите объединить CRM-данные Kommo с другими Zoho-сервисами - обратитесь в Exceltic.dev.

Ещё статьи

Все →