Why the native integration doesn’t work
Plecto is a Danish company specializing in real-time performance dashboards for sales teams. Its key features: TV dashboards for the office (large screen with metrics), gamification (leaderboards, achievement badges), and instant notifications when KPIs are hit. Plecto is popular in Nordics and Western Europe among B2B sales teams.
There is no ready-made native Plecto + Kommo integration. Plecto has direct connectors for Salesforce, HubSpot, and Pipedrive. For Kommo, a custom approach is needed.
Plecto supports a Custom Datasource - a mechanism where Plecto periodically (every few minutes) polls your REST API endpoint and receives data in a specified format. This opens up a path: create a Python service that aggregates data from the Kommo API and serves it to Plecto in the required structure.
For companies that build custom integrations for Kommo, this approach is a standard ETL pattern with caching.
What is implemented - solution architecture
Plecto (every 5 minutes)
--> GET /api/plecto/kommo-kpi
--> Python service (with cache)
--> Kommo API: GET /leads (current day)
--> Kommo API: GET /leads (pipeline by stage)
--> Kommo API: GET /leads (by manager)
<-- JSON in Plecto Custom Widget format
Technical details
Plecto Custom Datasource. Configured in Plecto -> Data Sources -> Add -> Custom (REST). Plecto makes a GET request to your endpoint. Expected response format: JSON array of objects. Each object is one data row with fields date, metric_name, value, employee_id (optional).
Plecto Custom Widget JSON format:
[
{
"date": "2026-06-01T10:00:00Z",
"employee": "John Smith",
"metric": "deals_won",
"value": 3
}
]
Kommo API for aggregation. We use several endpoints:
GET /api/v4/leadswith filterfilter[closed_at][from]={today_unix}- deals closed todayGET /api/v4/leadswithfilter[statuses][][pipeline_id]={id}&filter[statuses][][status_id]={id}- deals by stageGET /api/v4/users- list of managers
Caching. Kommo API has a rate limit of 7 requests/second. With frequent requests from Plecto, data should be cached in memory or Redis with a TTL of 2-5 minutes.
Step-by-step implementation
Step 1. FastAPI service with caching
import os
import time
import requests
from datetime import datetime, date
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
app = FastAPI()
security = HTTPBasic()
KOMMO_DOMAIN = os.environ["KOMMO_DOMAIN"]
KOMMO_TOKEN = os.environ["KOMMO_ACCESS_TOKEN"]
PLECTO_AUTH_USER = os.environ.get("PLECTO_AUTH_USER", "plecto")
PLECTO_AUTH_PASS = os.environ.get("PLECTO_AUTH_PASS", "secret")
# Simple in-memory cache
_cache: dict = {"data": None, "timestamp": 0, "ttl": 300} # 5 minutes
def authenticate(credentials: HTTPBasicCredentials = Depends(security)):
"""Basic authentication for the Plecto endpoint."""
correct_user = secrets.compare_digest(credentials.username, PLECTO_AUTH_USER)
correct_pass = secrets.compare_digest(credentials.password, PLECTO_AUTH_PASS)
if not (correct_user and correct_pass):
raise HTTPException(status_code=401, detail="Unauthorized")
def kommo_get(path: str, params: dict = None) -> dict:
"""Request to Kommo API."""
url = f"https://{KOMMO_DOMAIN}/api/v4{path}"
headers = {"Authorization": f"Bearer {KOMMO_TOKEN}"}
r = requests.get(url, params=params or {}, headers=headers, timeout=15)
if not r.ok:
return {}
return r.json()
def get_today_timestamp() -> int:
"""Unix timestamp for start of today (UTC)."""
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
return int(today.timestamp())
def fetch_kommo_kpi() -> list[dict]:
"""Aggregate KPIs from Kommo API."""
today_ts = get_today_timestamp()
today_str = date.today().isoformat()
result = []
# 1. Deals Won today (by manager)
won_params = {
"filter[closed_at][from]": today_ts,
"filter[statuses][][status_id]": 142, # Won
"limit": 250,
"with": "loss_reason",
}
won_data = kommo_get("/leads", won_params)
won_leads = won_data.get("_embedded", {}).get("leads", [])
# Group by responsible_user_id
won_by_user: dict[int, int] = {}
won_value_by_user: dict[int, int] = {}
for lead in won_leads:
uid = lead.get("responsible_user_id", 0)
won_by_user[uid] = won_by_user.get(uid, 0) + 1
won_value_by_user[uid] = won_value_by_user.get(uid, 0) + lead.get("price", 0)
# Get user names
users_data = kommo_get("/users")
users = {u["id"]: u["name"] for u in users_data.get("_embedded", {}).get("users", [])}
for uid, count in won_by_user.items():
employee = users.get(uid, f"User {uid}")
result.append({
"date": f"{today_str}T00:00:00Z",
"employee": employee,
"metric": "deals_won_today",
"value": count,
})
result.append({
"date": f"{today_str}T00:00:00Z",
"employee": employee,
"metric": "won_value_today",
"value": won_value_by_user.get(uid, 0),
})
# 2. Pipeline value by stage (current snapshot)
pipeline_data = kommo_get("/leads", {
"filter[statuses][][status_id]": "!142,!143", # exclude Won and Lost
"limit": 250,
})
pipeline_leads = pipeline_data.get("_embedded", {}).get("leads", [])
# Get statuses
statuses_data = kommo_get("/pipelines", {"with": "statuses"})
status_names: dict[int, str] = {}
for pipeline in statuses_data.get("_embedded", {}).get("pipelines", []):
for status in pipeline.get("_embedded", {}).get("statuses", []):
status_names[status["id"]] = status["name"]
stage_values: dict[int, int] = {}
stage_counts: dict[int, int] = {}
for lead in pipeline_leads:
sid = lead.get("status_id", 0)
stage_values[sid] = stage_values.get(sid, 0) + lead.get("price", 0)
stage_counts[sid] = stage_counts.get(sid, 0) + 1
for sid, value in stage_values.items():
stage_name = status_names.get(sid, f"Stage {sid}")
result.append({
"date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"employee": "Team",
"metric": f"pipeline_{stage_name.lower().replace(' ', '_')}_value",
"value": value,
})
result.append({
"date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"employee": "Team",
"metric": f"pipeline_{stage_name.lower().replace(' ', '_')}_count",
"value": stage_counts.get(sid, 0),
})
# 3. Total pipeline value
total_pipeline = sum(stage_values.values())
result.append({
"date": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"employee": "Team",
"metric": "total_pipeline_value",
"value": total_pipeline,
})
return result
@app.get("/api/plecto/kommo-kpi")
def get_kommo_kpi(credentials: HTTPBasicCredentials = Depends(authenticate)):
"""Endpoint for Plecto Custom Datasource."""
now = time.time()
if _cache["data"] is None or (now - _cache["timestamp"]) > _cache["ttl"]:
_cache["data"] = fetch_kommo_kpi()
_cache["timestamp"] = now
return _cache["data"]
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Step 2. Configure Plecto Custom Datasource
- In Plecto: Data Sources -> Add -> Custom (REST)
- URL:
https://your-service.example.com/api/plecto/kommo-kpi - Authentication: Basic Auth (username/password from
PLECTO_AUTH_USER/PASS) - Refresh interval: 5 minutes
- Date field:
date - Numeric fields:
value - Text fields:
employee,metric
Step 3. Create widgets in Plecto
After connecting the datasource, create widgets:
- Number widget with formula
SUM(value) WHERE metric = 'deals_won_today'- deals closed today - Leaderboard grouped by
employee, metricdeals_won_today- manager rankings - Bar chart with metric
pipeline_*_count- number of deals by stage - KPI Target with a daily goal and current value
won_value_today
Real case with numbers
For a EU B2B company with an office in Amsterdam (8 SDRs, 1 sales manager), a Plecto TV dashboard with Kommo data creates a transparency and competition effect.
Before the integration: the manager reviewed Kommo reports at end of day. SDRs couldn’t see each other’s progress in real time. The team result was unknown until the final standup.
After the integration: a leaderboard on the office TV screen updates every 5 minutes. When a deal is closed, Plecto plays a sound notification and animation. Based on feedback from similar teams, gamification creates a visible effect on SDR activity in the last 2-3 hours of the workday - precisely when energy typically drops.
Who this is for
The integration is relevant for companies that:
- Have a physical office where a TV dashboard can be displayed
- Use Kommo as the primary CRM for the sales team
- Want to introduce gamification and KPI transparency without switching to a new CRM
- Operate in Western Europe, where Plecto is known as an EU-native tool (GDPR-compliant, EU servers)
If you need deeper analytics - Kommo + Metabase or Kommo + Power BI offer more ad-hoc analysis capabilities.
Frequently asked questions
Does Plecto support historical data or only real-time?
Plecto supports both. If your endpoint returns data with a date field for past periods, Plecto builds charts for the week/month. To enable this, extend fetch_kommo_kpi() to retrieve data for N days.
How to add a “number of calls” metric?
If you’ve integrated OpenPhone or Aircall with Kommo via notes - you can count notes of type call_in and call_out for the day via GET /api/v4/leads/{id}/notes. This is more work for the team but feasible.
What is the Kommo API rate limit and will frequent requests cause issues? Kommo API: 7 requests/second. With cache TTL of 5 minutes, your endpoint makes 3-5 requests to Kommo on each refresh - well within the limit. Without caching, frequent requests from Plecto will result in 429 errors.
Can data from multiple pipelines be shown simultaneously?
Yes. Fetch the list of pipelines via GET /api/v4/pipelines, iterate over each, and add a pipeline_name field to the JSON for Plecto. Then filter the widget in Plecto by the specific pipeline.
If you need a Kommo + Plecto integration - describe your stack and team at Exceltic.dev. We’ll review the architecture and estimate the scope of work.