Zoho Analytics - облачный BI-инструмент из Zoho-экосистемы: создаёт интерактивные отчёты и дашборды, поддерживает SQL-запросы, автоматически обновляет данные. Kommo - CRM с данными воронки продаж. Компании, использующие Zoho Books, Zoho Projects или другие Zoho-инструменты, часто выбирают Zoho Analytics как централизованный BI - можно объединить данные из CRM, бухгалтерии и задач в одном дашборде.
Базовая проблема: Zoho Analytics не имеет нативного коннектора для Kommo (это не Zoho CRM). Данные из Kommo нужно загружать через Import API - аналогично PostgreSQL-подходу для Tableau, но с HTTP вместо JDBC.
Zoho Analytics Import API позволяет загружать данные через POST /api/{email}/{dbName}/{tableName} с автоматическим созданием таблиц при первой загрузке. Авторизация через OAuth 2.0 (Client Credentials для server-to-server).
Архитектура
Kommo REST API (инкрементальная выборка по updated_at)
-> Python ETL (cron каждые 2 часа)
-> Zoho Analytics Import API (Bulk Import или Row API)
Table: kommo_leads
Zoho Analytics
-> Отчёты на базе kommo_leads
-> Дашборды: воронка, KPI, прогноз выручки
Реализация OAuth 2.0 для Zoho Analytics
import requests, os, json, time
from pathlib import Path
ZA_CLIENT_ID = os.environ["ZOHO_CLIENT_ID"]
ZA_CLIENT_SECRET = os.environ["ZOHO_CLIENT_SECRET"]
ZA_REFRESH_TOKEN = os.environ["ZOHO_REFRESH_TOKEN"]
ZA_ORG_EMAIL = os.environ["ZOHO_ORG_EMAIL"]
ZA_DB_NAME = os.environ["ZOHO_DB_NAME"] # имя базы в Zoho Analytics
ZA_TABLE_NAME = "kommo_leads"
TOKEN_CACHE = Path("/tmp/zoho_token.json")
def get_zoho_token() -> str:
# Проверяем кэшированный токен
if TOKEN_CACHE.exists():
cached = json.loads(TOKEN_CACHE.read_text())
if cached.get("expires_at", 0) > time.time() + 60:
return cached["access_token"]
# Refresh
r = requests.post(
"https://accounts.zoho.com/oauth/v2/token",
data={
"client_id": ZA_CLIENT_ID,
"client_secret": ZA_CLIENT_SECRET,
"refresh_token": ZA_REFRESH_TOKEN,
"grant_type": "refresh_token",
},
)
r.raise_for_status()
data = r.json()
token_data = {
"access_token": data["access_token"],
"expires_at": time.time() + data.get("expires_in", 3600) - 30,
}
TOKEN_CACHE.write_text(json.dumps(token_data))
return token_data["access_token"]
ETL: Kommo -> Zoho Analytics
KOMMO_DOMAIN = os.environ["KOMMO_DOMAIN"]
KOMMO_TOKEN = os.environ["KOMMO_TOKEN"]
KOMMO_BASE = f"https://{KOMMO_DOMAIN}/api/v4"
KOMMO_HDR = {"Authorization": f"Bearer {KOMMO_TOKEN}"}
def fetch_leads_since(since_ts: int) -> list:
leads, page = [], 1
session = requests.Session()
session.headers.update(KOMMO_HDR)
while True:
r = session.get(f"{KOMMO_BASE}/leads", params={
"updated_at[from]": since_ts,
"limit": 250,
"page": page,
})
if r.status_code == 204:
break
batch = r.json().get("_embedded", {}).get("leads", [])
if not batch:
break
leads.extend(batch)
if len(batch) < 250:
break
page += 1
return leads
def leads_to_rows(leads: list) -> list[dict]:
rows = []
for lead in leads:
closed = lead.get("closed_at")
rows.append({
"lead_id": lead["id"],
"name": lead.get("name", ""),
"price": lead.get("price", 0),
"status_id": lead.get("status_id"),
"pipeline_id": lead.get("pipeline_id"),
"responsible_id": lead.get("responsible_user_id"),
"created_at": _ts(lead.get("created_at")),
"updated_at": _ts(lead.get("updated_at")),
"closed_at": _ts(closed) if closed else "",
})
return rows
def _ts(epoch: int | None) -> str:
if not epoch:
return ""
from datetime import datetime, timezone
return datetime.fromtimestamp(epoch, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
def import_to_zoho(rows: list[dict]):
if not rows:
return
token = get_zoho_token()
# Формат: JSON array
import_data = json.dumps(rows)
url = f"https://analyticsapi.zoho.com/api/{ZA_ORG_EMAIL}/{ZA_DB_NAME}/{ZA_TABLE_NAME}"
r = requests.post(
url,
headers={"Authorization": f"Zoho-oauthtoken {token}"},
params={
"ZOHO_ACTION": "IMPORT",
"ZOHO_OUTPUT_FORMAT": "JSON",
"ZOHO_ERROR_FORMAT": "JSON",
"ZOHO_API_VERSION": "1.0",
"ZOHO_IMPORT_TYPE": "UPDATEADD", # Upsert: обновить или добавить
"ZOHO_AUTO_IDENTIFY": "TRUE", # Авто-определение типов колонок
},
data={"ZOHO_DO_IN_BACKGROUND": "false", "ZOHO_IMPORT_DATA": import_data},
)
result = r.json()
summary = result.get("response", {}).get("result", {}).get("importSummary", {})
print(f"Imported: {summary.get('totalRecordCount', 0)} records")
def run_etl():
# Получить timestamp последней синхронизации из файла
ts_file = Path("/tmp/kommo_last_sync.txt")
since = int(ts_file.read_text()) if ts_file.exists() else 0
leads = fetch_leads_since(since)
if leads:
rows = leads_to_rows(leads)
import_to_zoho(rows)
# Обновить timestamp
max_ts = max(l.get("updated_at", 0) for l in leads)
ts_file.write_text(str(max_ts))
print(f"Synced {len(leads)} leads")
if __name__ == "__main__":
run_etl()
Дашборды в Zoho Analytics
После первой загрузки данных создайте отчёты в Zoho Analytics:
- Воронка продаж: Count leads by
status_id-> Bar chart - Выручка по менеджерам:
SUM(price) GROUP BY responsible_id-> Pie chart - Динамика закрытий:
COUNT(*) WHERE status_id=142 GROUP BY date(closed_at)-> Line chart - Average Deal Cycle:
AVG(closed_at - created_at) WHERE status_id=142-> KPI widget
Zoho Analytics поддерживает SQL-запросы напрямую - все стандартные агрегации и JOIN с другими таблицами (например, с данными из Zoho Books).
Реальный кейс
Компания, использующая Zoho Books для бухгалтерии и Kommo для продаж. Хотели видеть выручку из Zoho Books и статусы сделок из Kommo в одном дашборде. Решение: ETL Kommo -> Zoho Analytics, плюс нативный коннектор Zoho Books -> Zoho Analytics. Дашборд обновляется каждые 2 часа. Директор видит конверсию воронки и выставленные счета в одном месте без переключения систем.
Для кого актуально
Компании, уже использующие другие продукты Zoho (Books, Projects, Desk, Campaigns). Если Zoho Analytics уже лицензирован для других данных - добавить Kommo как источник не требует дополнительных затрат, только разработки ETL.
Аналогичный подход для Tableau разобран в Kommo + Tableau: BI-дашборд продаж.
Часто задаваемые вопросы
В чём отличие UPDATEADD от других режимов импорта?
Zoho Analytics поддерживает несколько режимов: APPEND (только добавление), TRUNCATEADD (очистить и добавить), UPDATEADD (обновить существующие записи по ключу или добавить новые). Для инкрементальной синхронизации нужен UPDATEADD с ключом lead_id.
Как настроить автоматический refresh без cron?
Zoho Analytics поддерживает Data Sync - настроить расписание прямо в интерфейсе для SQL-источников. Для Kommo нужен Data Bridge (on-premise компонент) или внешний ETL. Простейший вариант - GitHub Actions или любой scheduler.
Есть ли лимиты на Zoho Analytics Import API?
Да: 10 запросов в минуту для Import API, максимум 25 000 строк за один импорт. При большом объёме данных разбивайте импорт на батчи по 5 000-10 000 строк.
Итог
Kommo + Zoho Analytics:
- OAuth 2.0 Client Credentials + Refresh Token для авторизации
- Kommo paginated API (
updated_at[from]) -> инкрементальная выборка - Import API с
ZOHO_IMPORT_TYPE=UPDATEADD-> upsert по lead_id - Дашборды в Zoho Analytics с SQL-запросами поверх данных Kommo
Если вы используете Zoho-экосистему и хотите объединить CRM-данные Kommo с другими Zoho-сервисами - обратитесь в Exceltic.dev.