2026-02-14 · 架构
32
架构 · 2026-02-14

CVAT 深度研究:从部署安装到 API 自动化与源码链路(工程实战版)

适用对象:平台工程师、数据工程师、算法工程师、标注团队负责人、技术管理者。
版本基线:本文以 CVAT 社区版当前主线能力为基础,结合公开 OpenAPI(/api/schema/,示例版本 2.56.1)与仓库代码结构进行工程化拆解。不同版本字段可能有细微差异,请以你实例的 /api/swagger 为准。


摘要

CVAT(Computer Vision Annotation Tool)是当前工业界最常见的开源视觉标注平台之一。它的价值不在“能画框”本身,而在于:任务拆分、协作流程、质量控制、数据进出、自动化接口、可运维性这几件事能否形成闭环。很多团队在试用时觉得“挺好用”,但一旦上规模(多人、多项目、多格式、跨地域、长期迭代),就会遇到同一批问题:部署结构不清晰、数据存储混乱、导入导出不可控、API 调用不幂等、任务质量难量化、升级风险大。

本文从工程落地视角,对 CVAT 做一次完整深挖,覆盖:

  1. 架构与组件职责(UI / Server / Worker / DB / Queue / Cache / Analytics / OPA)。
  2. Docker Compose 的开发与生产部署方法,以及差异化配置。
  3. 生产化建议(存储、备份、监控、权限、安全、升级)。
  4. 标注流程最佳实践(项目、任务、作业、审核、质量控制、共识标注)。
  5. REST API 接入(认证、项目/任务创建、数据上传、导入导出、错误处理、幂等)。
  6. 一套可运行的 Python 自动化脚本:批量创建任务 + 上传数据 + 设置标签 + 触发导入 + 轮询状态。
  7. 源码链路:请求入口 → 序列化 → 业务层 → 异步队列 → 存储落盘。
  8. 性能调优与故障排查手册。
  9. 与 Label Studio 的工程视角对比。
  10. 企业 30/60/90 天落地路线图。

目标不是“会用 CVAT”,而是让你团队能把 CVAT 变成一条稳定、可度量、可持续演进的数据生产线。


目录


1. 架构总览

1.1 核心组件分层

CVAT 的核心不是“单体 Web 应用”,而是一个围绕异步任务构建的数据处理系统。典型 Docker Compose 拓扑包含:

1.2 逻辑架构图

[架构图占位]

可以把 CVAT 理解为两条主链路:

  1. 同步链路
  2. UI / 脚本调用 API
  3. API 做参数校验、权限校验
  4. 直接读写 DB 返回结果
  5. 适合列表查询、详情查看、元数据修改等轻操作

  6. 异步链路

  7. API 接收“重任务”(数据上传后处理、导出、导入、质量计算、共识合并)
  8. 生成 rq_id
  9. 投递到 RQ 队列
  10. 对应 worker 执行
  11. 客户端通过 /api/requests/{rq_id} 轮询进度

结论:在工程集成中,所有重量级操作都应按“异步任务”设计,而不是同步阻塞等待。

1.3 数据模型:Project / Task / Job

CVAT 的协作基本单位有三层:

补充两条质量相关能力:

1.4 存储视角

在默认 Compose 下,关键卷包括:

源码里 Data 模型会将任务原始上传路径落在类似:

因此,磁盘设计必须分离“结构化元数据(DB)”与“大文件介质(data)”,否则备份恢复会非常痛苦。


2. Docker Compose 部署:开发与生产差异

2.1 最小可用部署(单机)

git clone https://github.com/cvat-ai/cvat.git
cd cvat
export CVAT_HOST=your.domain.or.ip
docker compose up -d

docker exec -it cvat_server bash -ic 'python3 ~/manage.py createsuperuser'

访问:http://<CVAT_HOST>:8080

2.2 Compose 文件分层策略

生产中不要只用一个 docker-compose.yml,建议叠加:

  1. 基础层docker-compose.yml
  2. 包含完整核心服务。
  3. 开发层docker-compose.dev.yml
  4. 本地 build、调试端口(如 9090/9091/9092...)。
  5. HTTPS 层docker-compose.https.yml
  6. Traefik 开 80/443,自动证书(ACME)。
  7. 外部数据库层docker-compose.external_db.yml
  8. 替换内置 DB,使用外部 PostgreSQL。
  9. 自动标注层(可选)components/serverless/docker-compose.serverless.yml
  10. 引入 Nuclio。

示例:开发调试

docker compose \
  -f docker-compose.yml \
  -f docker-compose.dev.yml \
  up -d

示例:生产 HTTPS

export CVAT_HOST=cvat.example.com
export ACME_EMAIL=ops@example.com

docker compose \
  -f docker-compose.yml \
  -f docker-compose.https.yml \
  up -d

示例:外部 PostgreSQL

export CVAT_POSTGRES_HOST=pg-rw.internal
export CVAT_POSTGRES_DBNAME=cvat
export CVAT_POSTGRES_USER=cvat
export CVAT_POSTGRES_PASSWORD='***'

docker compose \
  -f docker-compose.yml \
  -f docker-compose.external_db.yml \
  up -d

2.3 开发与生产差异(必须理解)

维度
开发环境
生产环境

镜像来源
本地 build(便于改代码)
固定版本 tag(可追溯)

暴露端口
DB/Redis/调试端口常对本机开放
仅暴露 80/443;内部端口内网隔离

证书
可 HTTP
强制 HTTPS

日志级别
DEBUG 可接受
INFO/WARN,结构化日志落地

数据卷
可临时
必须持久化、可备份、可迁移

密钥管理
环境变量可凑合
Secret 管理(Vault/KMS/K8s Secret)

数据库
可内置容器
推荐外部托管 PostgreSQL(备份与高可用)

升级策略
直接拉新镜像
预生产验证 + 回滚方案 + 变更窗口

2.4 生产 override 示例(建议模板)

以下仅示意关键点:绑定稳定目录、限制暴露、收敛并发、显式主机名与密钥。

services:
  cvat_server:
    environment:
      ALLOWED_HOSTS: "cvat.example.com"
      DJANGO_SECRET_KEY: "replace-with-long-random-secret"
      ONE_RUNNING_JOB_IN_QUEUE_PER_USER: "true"
    restart: always

  cvat_worker_import:
    environment:
      NUMPROCS: 2

  cvat_worker_export:
    environment:
      NUMPROCS: 2

  cvat_worker_chunks:
    environment:
      NUMPROCS: 2

  traefik:
    ports:
      - "80:80"
      - "443:443"

volumes:
  cvat_data:
    driver_opts:
      type: none
      o: bind
      device: /data/cvat/data
  cvat_db:
    driver_opts:
      type: none
      o: bind
      device: /data/cvat/db
  cvat_logs:
    driver_opts:
      type: none
      o: bind
      device: /data/cvat/logs
  cvat_keys:
    driver_opts:
      type: none
      o: bind
      device: /data/cvat/keys

2.5 部署验收清单(上线前)

最少做这 10 条:

  1. docker compose ps 全部服务 healthyrunning
  2. 能登录超级管理员;能创建普通用户。
  3. 创建任务并上传 50 张图,任务成功切片。
  4. 导出一个小数据集并成功下载。
  5. /api/requests 可看到异步任务状态。
  6. 断开某 worker 后能发现告警(监控验证)。
  7. 备份脚本跑通一次并可恢复到新环境。
  8. HTTPS 证书有效且自动续签可验证。
  9. 普通 worker 账号权限受限(最小权限)。
  10. 升级回滚预案有实操记录。

2.6 何时从 Compose 迁移到 K8s

当出现以下信号,可考虑 Helm/K8s:

2.7 离线/内网(Air-gapped)部署思路

很多政企或涉密场景无法联网,这时应把“能跑起来”升级成“可持续更新”。推荐落地顺序:

  1. 镜像离线化:提前在外网环境拉取 CVAT 全量镜像(含依赖组件),通过 docker save/load 或私有镜像仓库(Harbor)导入。
  2. 依赖镜像源内网化:将 Python 包、系统包、前端依赖转为企业内部源,避免临时联网安装失败。
  3. 证书与域名内网化:内部 CA 签发证书,统一入口域名与证书轮换策略。
  4. 升级包版本锁定:在 release 清单中冻结镜像 tag、迁移脚本版本、配置模板版本,避免“同名 tag 漂移”。
  5. 离线备份演练:至少演练两类恢复:
  6. 平台级恢复(DB + volume)
  7. 业务级恢复(task/project backup)

内网部署最容易忽略的是“升级与补丁通道”。建议在 DMZ 或专用同步节点建立受控镜像同步流程,做到“可审计、可回滚、可追责”。


3. 生产化建议(存储、备份、监控、权限、安全、升级)

3.1 存储设计

3.1.1 分层原则

不要把三层混在同一块小盘,否则任何一层爆满都会拖垮系统。

3.1.2 容量估算方法(实战)

假设:

则仅原始数据约:

200000 × 1.2MB × 180 ≈ 43.2TB

再叠加缓存与冗余,建议按 1.3~1.6 倍 预留。很多团队栽在“只算原图,不算导出与缓存”。

3.2 备份与恢复

官方备份指南强调核心卷:cvat_dbcvat_datacvat_events_db。工程实践建议再加:cvat_keys

3.2.1 备份策略建议

3.2.2 恢复关键点

  1. 先用同版本 CVAT 恢复(避免 schema 不兼容)。
  2. 恢复完成并验证后再升级版本。
  3. 恢复后做抽样校验:
  4. 任务数量
  5. 标签一致性
  6. 随机任务可打开并显示帧
  7. 导出可用

3.3 监控与可观测性

CVAT 内置 analytics 体系(Vector + ClickHouse + Grafana),可用来做行为与性能分析。生产中建议补齐三层监控:

  1. 基础设施层:CPU、内存、磁盘、网络、IOPS。
  2. 服务层:容器重启、worker 存活、队列长度、任务耗时。
  3. 业务层:标注吞吐、返工率、质检得分、导出成功率。

关键 SLI(建议至少看这几个):

3.4 权限与组织模型

CVAT 有全局角色与组织角色(Owner/Maintainer/Supervisor/Worker)。建议:

OPA 策略可做高级定制(.rego 规则),但变更前必须在预生产验证。

3.5 安全基线

最低安全要求:

  1. 强制 HTTPS(docker-compose.https.yml)。
  2. ALLOWED_HOSTS 不要用 *(生产必须收敛域名)。
  3. DJANGO_SECRET_KEY 长随机且安全存储。
  4. 仅暴露必要入口端口,DB/Redis 不对公网。
  5. Access Token 设置过期时间,优先 read_only 令牌用于只读场景。
  6. 审计关键操作(创建/删除任务、导出、权限变更),结合事件日志和 X-Request-Id 追踪。

3.6 升级策略

标准流程:

  1. 备份(全量 + 恢复演练记录)。
  2. 预生产演练升级并跑回归(API、导入导出、质检)。
  3. 生产停机窗口内 down -> pull/build -> up
  4. 观察 DB migration 日志(docker logs cvat_server -f)。
  5. 若异常,按预案回滚。

注意:历史版本有 PostgreSQL 大版本切换等特殊步骤,必须看官方 upgrade guide。


4. 使用流程最佳实践(项目/任务/作业/审核/质量)

4.1 标签治理优先于“赶进度”

很多返工并不是标注员画错,而是标签体系设计烂:

建议建立《标签规范 v1》并固定:

4.2 Project 与 Task 切分策略

推荐原则:

典型切分维度:

4.3 Job 切分参数:Segment Size 与 Overlap

经验值(仅起点,需按业务调):

4.4 审核流:Annotation → Validation → Acceptance

建议把 stage 与角色职责绑定:

不要让“谁都能改 stage”成为常态,会导致责任边界混乱。

4.5 质量控制(GT/Honeypot/即时反馈/共识)

4.5.1 Ground Truth / Honeypot

在任务创建时配置验证模式,能把质检前移。已有任务可新增 GT job(功能限制以版本为准)。

4.5.2 即时反馈

设置 Max validations per job > 0 后,可在作业完成时触发即时验证反馈,快速降低低级错误流入下一环节。

4.5.3 共识标注(Consensus)

适合高价值数据:

这不是“全量都上”的功能,而是成本换质量,建议用于:

4.6 质量 KPI 建议

至少做四类指标:

  1. 生产效率:日均完成量、P50/P95 作业时长。
  2. 质量稳定性:抽检通过率、返工率、低分对象占比。
  3. 流程健康度:stage 转换周期、阻塞作业数。
  4. 自动化成熟度:API 覆盖率、人工干预比例。

5. REST API 接入实践

5.1 先看你实例自己的 API 文档

原则:代码里依赖的字段,先以你实例的 schema 为准,不要盲抄网络帖子。

5.2 认证方式

CVAT 支持多种认证方式:

  1. POST /api/auth/login 获取 Token(Authorization: Token <key>,兼容旧方式)。
  2. Access Token(推荐自动化):Authorization: Bearer <token>
  3. Session + CSRF(浏览器场景)。

5.2.1 登录示例

curl -X POST 'https://cvat.example.com/api/auth/login' \
  -H 'Content-Type: application/json' \
  -d '{"username": "admin", "password": "***"}'

5.2.2 Access Token(推荐)

创建入口:/api/auth/access_tokens。可设置 expiry_dateread_only

5.3 组织上下文(多租户)

如果你启用了 organization,很多接口需要在 Header 里传:

自动化脚本若漏传,常见现象是“看不到任务”或权限异常。

5.4 典型 API 工作流

5.4.1 创建项目

POST /api/projects

关键字段:

5.4.2 创建任务

POST /api/tasks

关键字段:

5.4.3 上传数据到任务

POST /api/tasks/{id}/data/

支持两类方式:

请求成功通常返回 202 + rq_id,需轮询。

注意:任务一旦附加数据,不能直接“替换数据”;重复上传会报错(典型提示:Adding more data is not supported)。

5.4.4 触发注释导入

POST /api/tasks/{id}/annotations?format=<FORMAT>

可返回 202 + rq_id(异步)或直接 201(已完成)。

5.4.5 导出数据集

POST /api/tasks/{id}/dataset/export?format=<FORMAT>&save_images=true

返回 rq_id 后轮询 /api/requests/{rq_id},状态 finished 后可用 result_url 下载。

5.5 异步任务轮询范式

curl -H 'Authorization: Bearer <token>' \
  'https://cvat.example.com/api/requests/<rq_id>'

返回关键字段:

5.6 错误处理矩阵(建议落地)

状态码
常见场景
处理策略

400
参数错误、重复上传数据
立即失败,打印服务端 message,人工修正输入

401
未认证
重新登录或刷新 token

403
权限不足
检查组织角色、对象归属、X-Organization

404
资源不存在
检查 ID 与环境(测试/生产串线)

405
格式不可用
先调用 /api/server/annotation/formats 获取可用格式

409
同类任务已在处理中
复用返回的 rq_id 继续轮询(不要重复发起)

429
资源锁冲突/限流
Retry-After 退避重试

503
Redis/依赖服务不可用
触发运维告警,检查队列与基础服务

5.7 幂等设计(重点)

CVAT 接口本身不是全局“天然幂等”(没有通用 Idempotency-Key),需要客户端补齐:

  1. 业务主键幂等:用 project_name + task_name + batch_id 作为唯一键。
  2. 请求状态幂等:保存 rq_id,失败重试先查状态。
  3. 409 幂等:遇到 409 不重提,改为轮询既有任务。
  4. 状态落盘:本地 state.json 或数据库记录处理进度。
  5. 可重入脚本:中断后可从上次成功步骤恢复。

5.8 API 可靠性实践


6. Python 脚本:批量创建任务 + 上传数据 + 设置标签 + 触发导入 + 轮询状态

下面给出一套实战脚本(基于 requests,避免过重依赖),包含:

6.1 前置依赖

python3 -m venv .venv
source .venv/bin/activate
pip install requests

6.2 输入规范(JSON)

保存为 batch_spec.json

{
  "project": {
    "name": "traffic-2026q1",
    "labels": [
      {
        "name": "car",
        "color": "#ff0000",
        "attributes": [
          {
            "name": "occluded",
            "mutable": true,
            "input_type": "checkbox",
            "default_value": "false",
            "values": ["true", "false"]
          }
        ],
        "type": "rectangle"
      },
      {
        "name": "person",
        "type": "rectangle"
      }
    ]
  },
  "tasks": [
    {
      "name": "cam01-2026-02-01",
      "subset": "train",
      "segment_size": 500,
      "overlap": 10,
      "image_quality": 80,
      "use_cache": true,
      "data_globs": [
        "./datasets/cam01/2026-02-01/*.jpg"
      ],
      "annotation_file": "./annotations/cam01-2026-02-01-coco.zip",
      "annotation_format": "COCO 1.0"
    },
    {
      "name": "cam02-2026-02-01",
      "subset": "train",
      "segment_size": 500,
      "overlap": 10,
      "image_quality": 80,
      "use_cache": true,
      "data_globs": [
        "./datasets/cam02/2026-02-01/*.jpg"
      ]
    }
  ]
}

6.3 可运行脚本

文件建议:cvat_batch_tasks.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import argparse
import glob
import json
import mimetypes
import os
import sys
import time
from contextlib import ExitStack
from pathlib import Path
from typing import Any

import requests


class CVATApiError(RuntimeError):
    pass


class CVATClient:
    def __init__(
        self,
        host: str,
        *,
        token: str | None = None,
        username: str | None = None,
        password: str | None = None,
        organization: str | None = None,
        verify_ssl: bool = True,
        max_retries: int = 3,
    ):
        self.host = host.rstrip("/")
        self.username = username
        self.password = password
        self.max_retries = max_retries

        self.session = requests.Session()
        self.session.verify = verify_ssl
        self.session.headers.update(
            {
                "Accept": "application/vnd.cvat+json",
                "User-Agent": "cvat-batch-script/1.0",
            }
        )

        if organization:
            self.session.headers["X-Organization"] = organization

        if token:
            # Access Token 推荐用 Bearer;旧 token 可改成 Token 前缀
            self.session.headers["Authorization"] = f"Bearer {token}"

    def _request(self, method: str, path: str, *, timeout: int = 60, **kwargs) -> requests.Response:
        url = f"{self.host}{path}"
        last_exc: Exception | None = None

        for i in range(self.max_retries):
            try:
                resp = self.session.request(method, url, timeout=timeout, **kwargs)
                if resp.status_code >= 400:
                    detail = None
                    try:
                        detail = resp.json()
                    except Exception:
                        detail = resp.text[:1200]
                    raise CVATApiError(
                        f"{method} {url} -> {resp.status_code}, detail={detail}"
                    )
                return resp
            except (requests.ConnectionError, requests.Timeout) as ex:
                last_exc = ex
                if i == self.max_retries - 1:
                    raise
                sleep_s = 2 ** i
                print(f"[WARN] transient error: {ex}, retry in {sleep_s}s")
                time.sleep(sleep_s)

        assert last_exc is not None
        raise last_exc

    def _json(self, resp: requests.Response) -> Any:
        if not resp.content:
            return None
        ctype = resp.headers.get("Content-Type", "")
        if "json" in ctype:
            return resp.json()
        try:
            return resp.json()
        except Exception:
            return {"_raw": resp.text}

    def login_if_needed(self):
        if "Authorization" in self.session.headers:
            return
        if not self.username or not self.password:
            raise ValueError("Missing credentials: provide --token or --username/--password")

        payload = {"username": self.username, "password": self.password}
        resp = self._request("POST", "/api/auth/login", json=payload, timeout=30)
        data = self._json(resp)
        key = data.get("key")
        if not key:
            raise CVATApiError(f"Login succeeded but no token key returned: {data}")

        # /api/auth/login 返回的是旧式 Token;生产建议改用 Access Token(Bearer)
        self.session.headers["Authorization"] = f"Token {key}"

    def list_projects(self, name: str | None = None) -> list[dict[str, Any]]:
        params = {"page_size": 100}
        if name:
            params["name"] = name
        resp = self._request("GET", "/api/projects", params=params)
        return self._json(resp).get("results", [])

    def create_project(self, name: str, labels: list[dict[str, Any]] | None = None) -> dict[str, Any]:
        payload: dict[str, Any] = {"name": name}
        if labels:
            payload["labels"] = labels
        resp = self._request("POST", "/api/projects", json=payload)
        return self._json(resp)

    def get_or_create_project(self, name: str, labels: list[dict[str, Any]] | None = None) -> dict[str, Any]:
        projects = self.list_projects(name=name)
        for p in projects:
            if p.get("name") == name:
                return p
        print(f"[INFO] create project: {name}")
        return self.create_project(name=name, labels=labels)

    def list_tasks(self, *, name: str | None = None, project_id: int | None = None) -> list[dict[str, Any]]:
        params: dict[str, Any] = {"page_size": 100}
        if name:
            params["name"] = name
        if project_id is not None:
            params["project_id"] = project_id
        resp = self._request("GET", "/api/tasks", params=params)
        return self._json(resp).get("results", [])

    def find_task(self, name: str, project_id: int | None = None) -> dict[str, Any] | None:
        tasks = self.list_tasks(name=name, project_id=project_id)
        for t in tasks:
            if t.get("name") == name and (project_id is None or t.get("project_id") == project_id):
                return t
        return None

    def create_task(
        self,
        *,
        name: str,
        project_id: int | None,
        labels: list[dict[str, Any]] | None,
        subset: str | None,
        segment_size: int | None,
        overlap: int | None,
    ) -> dict[str, Any]:
        payload: dict[str, Any] = {"name": name}
        if project_id is not None:
            payload["project_id"] = project_id
        if labels and project_id is None:
            payload["labels"] = labels
        if subset:
            payload["subset"] = subset
        if segment_size is not None:
            payload["segment_size"] = segment_size
        if overlap is not None:
            payload["overlap"] = overlap

        resp = self._request("POST", "/api/tasks", json=payload)
        return self._json(resp)

    def attach_data(
        self,
        *,
        task_id: int,
        files: list[Path],
        image_quality: int = 80,
        use_cache: bool = True,
        sorting_method: str = "lexicographical",
        timeout: int = 3600,
    ) -> str:
        if not files:
            raise ValueError("attach_data: no files")

        with ExitStack() as stack:
            multipart = []
            for p in files:
                fp = stack.enter_context(open(p, "rb"))
                ctype = mimetypes.guess_type(str(p))[0] or "application/octet-stream"
                multipart.append(("client_files", (p.name, fp, ctype)))

            data = {
                "image_quality": str(image_quality),
                "use_cache": str(use_cache).lower(),
                "sorting_method": sorting_method,
            }

            resp = self._request(
                "POST",
                f"/api/tasks/{task_id}/data/",
                data=data,
                files=multipart,
                timeout=timeout,
            )
            payload = self._json(resp)
            rq_id = payload.get("rq_id")
            if not rq_id:
                raise CVATApiError(f"attach_data no rq_id: {payload}")
            return rq_id

    def import_annotations(
        self,
        *,
        task_id: int,
        annotation_file: Path,
        annotation_format: str,
        timeout: int = 3600,
    ) -> str | None:
        if not annotation_file.exists():
            raise FileNotFoundError(annotation_file)

        with annotation_file.open("rb") as f:
            files = {
                "annotation_file": (annotation_file.name, f, "application/octet-stream")
            }
            resp = self._request(
                "POST",
                f"/api/tasks/{task_id}/annotations",
                params={"format": annotation_format},
                files=files,
                timeout=timeout,
            )

        # 201 代表直接完成,202 返回 rq_id 异步执行
        if resp.status_code == 201:
            return None

        payload = self._json(resp)
        rq_id = payload.get("rq_id")
        if not rq_id:
            raise CVATApiError(f"import_annotations no rq_id: {payload}")
        return rq_id

    def get_request(self, rq_id: str) -> dict[str, Any]:
        resp = self._request("GET", f"/api/requests/{rq_id}")
        return self._json(resp)

    def wait_request(self, rq_id: str, *, timeout_sec: int = 7200, interval_sec: int = 3) -> dict[str, Any]:
        start = time.time()
        while True:
            req = self.get_request(rq_id)
            status = req.get("status")
            progress = req.get("progress")
            message = req.get("message", "")
            print(f"[RQ] {rq_id} status={status} progress={progress} message={message}")

            if status == "finished":
                return req
            if status == "failed":
                raise CVATApiError(f"rq_id={rq_id} failed, message={message}, req={req}")

            if time.time() - start > timeout_sec:
                raise TimeoutError(f"rq_id={rq_id} wait timeout>{timeout_sec}s")

            time.sleep(interval_sec)


def collect_files(globs: list[str]) -> list[Path]:
    out: list[Path] = []
    for g in globs:
        out.extend(Path(p) for p in glob.glob(g))
    # 去重 + 排序,确保结果可复现
    uniq = sorted(set(out), key=lambda x: str(x))
    return [p for p in uniq if p.is_file()]


def load_json(path: Path) -> dict[str, Any]:
    with path.open("r", encoding="utf-8") as f:
        return json.load(f)


def save_json(path: Path, data: dict[str, Any]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)


def main():
    parser = argparse.ArgumentParser(description="Batch create CVAT tasks and upload data")
    parser.add_argument("--host", required=True, help="e.g. https://cvat.example.com")
    parser.add_argument("--token", default=os.getenv("CVAT_TOKEN"), help="Access token (Bearer)")
    parser.add_argument("--username", default=os.getenv("CVAT_USERNAME"))
    parser.add_argument("--password", default=os.getenv("CVAT_PASSWORD"))
    parser.add_argument("--organization", default=os.getenv("CVAT_ORG"), help="X-Organization slug")
    parser.add_argument("--spec", required=True, type=Path, help="batch_spec.json")
    parser.add_argument("--state", default=Path("./.cvat_batch_state.json"), type=Path)
    parser.add_argument("--insecure", action="store_true", help="disable SSL verification")
    args = parser.parse_args()

    spec = load_json(args.spec)
    state = load_json(args.state) if args.state.exists() else {"tasks": {}}

    client = CVATClient(
        host=args.host,
        token=args.token,
        username=args.username,
        password=args.password,
        organization=args.organization,
        verify_ssl=not args.insecure,
    )
    client.login_if_needed()

    project_id = None
    project_cfg = spec.get("project")
    if project_cfg:
        project = client.get_or_create_project(
            name=project_cfg["name"],
            labels=project_cfg.get("labels"),
        )
        project_id = project["id"]
        print(f"[INFO] using project id={project_id}, name={project.get('name')}")

    for t in spec.get("tasks", []):
        task_name = t["name"]
        print(f"\n=== TASK: {task_name} ===")

        existing = client.find_task(task_name, project_id=project_id)
        if existing:
            task = existing
            print(f"[INFO] task exists, id={task['id']}, skip create")
        else:
            task = client.create_task(
                name=task_name,
                project_id=project_id,
                labels=t.get("labels"),
                subset=t.get("subset"),
                segment_size=t.get("segment_size"),
                overlap=t.get("overlap"),
            )
            print(f"[INFO] task created, id={task['id']}")

        task_id = task["id"]
        state["tasks"].setdefault(task_name, {})
        state["tasks"][task_name]["task_id"] = task_id
        save_json(args.state, state)

        # 1) 上传数据(仅当首次创建任务时自动执行;已存在任务默认跳过,避免重复附加报错)
        if not existing:
            globs = t.get("data_globs", [])
            files = collect_files(globs)
            print(f"[INFO] files matched: {len(files)}")
            if files:
                rq_id = client.attach_data(
                    task_id=task_id,
                    files=files,
                    image_quality=int(t.get("image_quality", 80)),
                    use_cache=bool(t.get("use_cache", True)),
                )
                state["tasks"][task_name]["upload_rq_id"] = rq_id
                save_json(args.state, state)
                client.wait_request(rq_id)
            else:
                print("[WARN] no files matched, skip upload")
        else:
            print("[INFO] skip data upload for existing task")

        # 2) 导入注释(可选)
        ann = t.get("annotation_file")
        ann_fmt = t.get("annotation_format")
        if ann and ann_fmt:
            ann_path = Path(ann)
            rq_id = client.import_annotations(
                task_id=task_id,
                annotation_file=ann_path,
                annotation_format=ann_fmt,
            )
            if rq_id:
                state["tasks"][task_name]["import_rq_id"] = rq_id
                save_json(args.state, state)
                client.wait_request(rq_id)
            else:
                print("[INFO] annotation import finished synchronously")

        state["tasks"][task_name]["done"] = True
        save_json(args.state, state)
        print(f"[DONE] task={task_name}, id={task_id}")

    print("\nAll tasks finished.")


if __name__ == "__main__":
    main()

6.4 运行方式

python cvat_batch_tasks.py \
  --host https://cvat.example.com \
  --username admin \
  --password '***' \
  --organization your-org-slug \
  --spec ./batch_spec.json

或使用 Access Token:

python cvat_batch_tasks.py \
  --host https://cvat.example.com \
  --token "$CVAT_TOKEN" \
  --organization your-org-slug \
  --spec ./batch_spec.json

6.5 脚本说明与扩展建议

  1. 该脚本用“任务名 + 项目”做幂等主键;你可以改成业务 batch id。
  2. 已存在任务默认不重复上传数据(CVAT 设计上不支持简单追加覆盖)。
  3. 大规模上传(超大文件/弱网)建议改造为 TUS 分块协议,或使用官方 cvat-sdk / cvat-cli
  4. 若需要并发,可在任务层加线程池,但必须控制并发度,避免把 import/export 队列打满。
  5. 建议把 state 文件同步到对象存储或 CI 制品,确保作业恢复能力。

7. 源码链路拆解:请求入口→序列化→业务层→异步队列→存储

这部分直接对应你后续排障能力。

7.1 请求入口(URL Router)

核心入口在:

即:

7.2 视图层(TaskViewSet)

cvat/apps/engine/views.py

关键工程点:

7.3 序列化层(DataSerializer / TaskWriteSerializer)

cvat/apps/engine/serializers.pyDataSerializer 会验证:

这意味着很多“看似业务错误”的失败,其实在 serializer 层就被拒绝了。

7.4 业务层(task.create_thread)

异步任务最终回调到:

这里是真正“重活区”:

  1. 验证 job 映射与 validation 参数。
  2. 处理 remote/cloud/share/local 数据来源。
  3. 识别 manifest 与排序策略。
  4. 媒体类型提取(图像/视频/压缩包/点云)。
  5. 写入 Image/Video 元数据。
  6. 生成 segment 与 jobs。
  7. 生成静态 chunk 或按需缓存策略。
  8. 准备预览帧。

你看到“任务创建很慢”,多数瓶颈就在这个函数覆盖的流程里。

7.5 异步队列层(AbstractRequestManager + RQ)

机制要点:

  1. 生成 request id(rq_id)。
  2. 若同 request id 任务正在跑,返回 409 + 原 rq_id(避免重复执行)。
  3. 入队(queue 名由 settings 中 CVAT_QUEUES 定义:import/export/annotation/...)。

这就是为什么 API 自动化必须把 rq_id 当一等公民管理。

7.6 存储落盘

cvat/apps/engine/models.pyData 模型定义了路径组织:

对应容器内根路径通常是 /home/django/data(映射到 cvat_data 卷)。

7.7 请求状态与结果

cvat/apps/redis_handler/views.py + serializers.py

导出类任务 finished 后通常携带 result_url;创建类任务可能返回 result_id

7.8 一句话串起来

API请求ViewSetSerializerBackground Manager enqueueWorker执行业务函数DB/文件存储Requests API轮询状态

理解这条链,CVAT 80% 的故障你都能定位。


8. 性能调优与常见故障排查

8.1 性能调优优先级

先看瓶颈在哪层:

  1. IO 瓶颈(最常见):大批小文件、网络存储慢、磁盘随机读写差。
  2. CPU 瓶颈:视频解码、chunk 生成、导入导出转换。
  3. 队列瓶颈:worker 并发不足或任务串行策略过严。
  4. 数据库瓶颈:慢查询、连接数耗尽、磁盘延迟。

8.2 可调参数(实用)

8.3 场景化建议

场景 A:海量图片(>100 万)

场景 B:长视频

场景 C:云存储直连

8.4 故障排查表

现象
可能原因
快速检查
处理建议

任务长期 queued
import worker 异常、Redis 问题
docker logs cvat_worker_import -f
重启 worker/修复 Redis,恢复后继续轮询

导入失败 Unknown format
格式名不匹配或禁用
GET /api/server/annotation/formats
用接口返回格式名,不手写猜测

上传报 Adding more data is not supported
对已有数据任务重复附加
看 task 是否已存在且有数据
任务级幂等:已有任务跳过上传

503 Redis service is not available
Redis 不可达
docker compose ps + redis 日志
先恢复 Redis,再处理业务重试

导出很慢或失败
worker 并发不足、磁盘紧张
export 队列长度、磁盘占用
提升 export worker,清理缓存

UI 卡顿
chunk 生成慢、网络慢
浏览器请求耗时、chunks worker 状态
提升 chunk 并发,优化存储 IO

升级后启动失败
migration 问题或版本跳跃过大
docker logs cvat_server -f
回滚到备份版本,按官方路径升级

恢复后数据异常
版本不一致、卷恢复不完整
样本任务验证
同版本恢复后再升级

8.5 排障命令清单

# 服务状态
docker compose ps

# 关键日志
docker logs cvat_server -f
docker logs cvat_worker_import -f
docker logs cvat_worker_export -f
docker logs cvat_worker_chunks -f

# DB 连通性(示例)
docker exec -it cvat_db psql -U root -d cvat -c 'select now();'

# 查看最近异步请求(也可 API)
# GET /api/requests?page_size=20&sort=-created_date

8.6 性能优化落地节奏(建议)

  1. 先建基线:记录当前吞吐和时延。
  2. 每次只改一个参数(例如 NUMPROCS)。
  3. 跑同批压测样本,记录前后差异。
  4. 达标后固化配置和 runbook。

9. 与 Label Studio / Supervisely 对比(工程视角)

这里不做“好坏绝对论”,只谈工程匹配度与组织成本。

9.1 对比维度

维度
CVAT
Label Studio
Supervisely

典型定位
视觉标注生产平台
通用标注平台(多模态、配置化)
视觉平台化生态(标注 + 数据管理 + 应用)

视频/跟踪能力
强,长期优化
可支持,但深度视频链路通常要额外工程化
强,生态应用较丰富

协作模型
Project/Task/Job + 阶段流 + 质量模块
任务模板驱动,流程灵活但需自行治理
平台化工作流,组织能力较强

异步导入导出
rq_id + /api/requests 模式清晰
取决于任务与部署模式
平台任务体系完善

二次开发方式
REST API + SDK + 源码改造 + serverless
XML 配置 + API + ML backend 扩展
SDK + App 机制,平台扩展能力强

部署复杂度
中高(组件较多)
中低(起步快)
中高(平台化能力强但学习成本更高)

企业治理(权限/审计)
支持组织角色与策略治理
可做,需更多自定义
企业版治理能力相对完整

适合阶段
从 PoC 到规模化生产都可
快速试验/多模态探索
平台化建设与深度运营

9.2 选型建议

优先选 CVAT 的场景:

优先选 Label Studio 的场景:

优先选 Supervisely 的场景:

9.3 常见组合策略(真实企业更常见)

在不少企业里,最终不是“单平台宗教”,而是分层使用:

  1. 探索层:用 Label Studio 快速定义新任务、快速收集先验;
  2. 生产层:成熟任务迁移到 CVAT 做规模化标注与质控;
  3. 平台层:若组织具备平台能力,再引入 Supervisely 生态做统一运营。

关键判断标准不是“工具功能对比表”,而是三件事:


10. 企业落地路线图(30/60/90 天)

0~30 天:打基础(可用)

目标:从 0 到可运行,形成最小闭环。

交付物:

  1. 生产级 Compose 部署(HTTPS、备份、监控初版)。
  2. 角色与组织模型设计。
  3. 标签规范 v1 与验收标准 v1。
  4. API POC(创建任务、上传、轮询、导出)。
  5. 试运行 1~2 条业务线。

验收指标:

31~60 天:提效率(可规模)

目标:把人工流程变成半自动流水线。

交付物:

  1. 批量建任务脚本接入 CI/CD。
  2. 质检流程上线(GT/即时反馈/抽检看板)。
  3. 导入导出标准化(格式白名单、失败重试策略)。
  4. 排障 runbook 与值班手册。

验收指标:

61~90 天:可治理(可持续)

目标:建立可运营、可审计、可升级的长期体系。

交付物:

  1. SLA/SLO 定义(可用性、时延、恢复目标)。
  2. 成本模型(存储/算力/人效)与优化策略。
  3. 升级演练机制(预生产回归 + 回滚)。
  4. 数据资产治理(数据版本、标注版本、追溯链)。

验收指标:


11. 参考资料(官方文档与仓库优先)

官方文档

  1. Installation Guide
    https://docs.cvat.ai/docs/administration/community/basics/installation/

  2. CVAT Architecture
    https://docs.cvat.ai/docs/administration/community/advanced/cvat-architecture/

  3. API Overview / Server API
    https://docs.cvat.ai/docs/api_sdk/api/

  4. 在线 Swagger(示例实例)
    https://app.cvat.ai/api/swagger

  5. OpenAPI Schema(示例实例)
    https://app.cvat.ai/api/schema/

  6. Backup Guide
    https://docs.cvat.ai/docs/administration/community/advanced/backup_guide/

  7. Upgrade Guide
    https://docs.cvat.ai/docs/administration/community/advanced/upgrade_guide/

  8. Analytics and monitoring
    https://docs.cvat.ai/docs/administration/community/advanced/analytics/

  9. Tasks / Projects / Jobs / Quality control
    https://docs.cvat.ai/docs/workspace/tasks-page/
    https://docs.cvat.ai/docs/workspace/projects/
    https://docs.cvat.ai/docs/workspace/jobs-page/
    https://docs.cvat.ai/docs/qa-analytics/quality-control/

  10. Consensus annotation
    https://docs.cvat.ai/docs/qa-analytics/consensus/

  11. Data on the fly
    https://docs.cvat.ai/docs/dataset_management/data-on-fly/

对比参考资料(外部资料,仅用于工程横向分析)

官方仓库

关键源码入口(建议重点阅读):


附录 A:给技术负责人看的落地建议(浓缩版)

  1. 把 CVAT 当“异步数据工厂”,不要当“单页标注工具”。
  2. 先固化标签规范,再谈规模提速。
  3. 任务自动化必须做幂等和状态落盘。
  4. 生产部署至少要有:HTTPS、备份、监控、回滚预案。
  5. 导入导出链路一律走 /api/requests/{rq_id} 轮询,不要同步阻塞。
  6. 质量控制必须前移:GT/即时反馈/共识机制按成本分层使用。
  7. 升级永远先演练后上线,尤其涉及 DB/FFmpeg 等底层变更。
  8. 建立 runbook:出现 queued 卡死409 冲突格式不匹配时团队能快速自救。
  9. 将 CVAT 事件与内部日志打通,形成请求级可追踪闭环。
  10. 30/60/90 天分阶段推进,不要试图“一次做完所有治理”。

如果你接下来要进入实操阶段,建议第一周先做两件事:

这两件事完成后,你的 CVAT 就具备了进入生产迭代的基本资格。

附录 B:生产部署实操手册(从裸机到上线)

这一节给出一个“可以抄作业”的上线手册。核心目标是:让部署动作可重复、可审计、可回滚

B.1 机器与系统基线建议

以下是社区版常见负载下的经验值,不是绝对标准。你的真实配置应以压测结果为准。

B.1.1 计算资源分档

档位
并发标注人数
推荐配置
适用场景

S
10~30
8 vCPU / 32GB / NVMe 1TB
小团队试运行

M
30~80
16 vCPU / 64GB / NVMe 2~4TB
多项目并行

L
80~200
32 vCPU / 128GB / NVMe 8TB+ + 外置对象存储
企业生产

B.1.2 磁盘与文件系统

B.1.3 操作系统基线

建议在上线前固定并记录:

B.2 目录与权限规范

建议所有 CVAT 持久目录统一挂在 /data/cvat

/data/cvat/
  ├── data/         # 媒体与任务数据
  ├── db/           # PostgreSQL
  ├── events/       # ClickHouse
  ├── logs/         # 服务日志
  ├── keys/         # 密钥与敏感文件
  ├── backup/       # 备份产物暂存
  └── scripts/      # 运维脚本

权限建议:

B.3 .env.production 模板(建议)

# 基础
CVAT_HOST=cvat.example.com
ACME_EMAIL=ops@example.com
CVAT_VERSION=v2.56.1

# 安全
ALLOWED_HOSTS=cvat.example.com
DJANGO_SECRET_KEY=replace-with-very-long-random-value

# 数据库(外置示例)
CVAT_POSTGRES_HOST=pg-rw.internal
CVAT_POSTGRES_PORT=5432
CVAT_POSTGRES_DBNAME=cvat
CVAT_POSTGRES_USER=cvat
CVAT_POSTGRES_PASSWORD=***

# 队列与并发
ONE_RUNNING_JOB_IN_QUEUE_PER_USER=true
CVAT_CONCURRENT_CHUNK_PROCESSING=2

# 日志
DJANGO_LOG_LEVEL=INFO

上线原则:

  1. .env.production 不入 Git。
  2. 所有敏感项尽量迁移到 Secret 管理系统。
  3. 变量变更要走变更单审批与审计。

B.4 标准上线 Playbook

阶段 1:发布前

  1. 预生产回归通过(包含 API、上传、导出、质检、权限)。
  2. 备份状态确认:最近一次全量备份可恢复。
  3. 发布窗口确认:业务方、标注团队、运维值班在场。

阶段 2:执行发布

# 1) 拉取代码与版本
cd /opt/cvat
git fetch --all
git checkout v2.56.1

# 2) 预拉镜像
docker compose pull

# 3) 停服
docker compose down

# 4) 启动
docker compose \
  -f docker-compose.yml \
  -f docker-compose.https.yml \
  up -d

# 5) 观察迁移与启动
docker logs cvat_server -f

阶段 3:发布后验证

阶段 4:收尾

B.5 变更单模板(建议字段)

B.6 生产环境不建议做的事

  1. 直接在生产容器里改代码。
  2. 没有备份就升级。
  3. 开放 DB/Redis 公网端口。
  4. 默认 admin 账号长期共享。
  5. 无审计地手工删任务/删卷。

附录 C:REST API 端到端模板(含幂等与重试)

C.1 端到端时序(脚本视角)

  1. 登录或注入 token。
  2. 查项目是否存在,不存在则创建。
  3. 查任务是否存在,不存在则创建。
  4. 上传数据,拿到 rq_id
  5. 轮询 rq_id 直到完成。
  6. 可选导入注释(再次拿 rq_id 轮询)。
  7. 可选导出并下载结果。
  8. 写入本地/中心化状态表。

C.2 cURL 示例全集

C.2.1 登录

curl -sS -X POST "$CVAT/api/auth/login" \
  -H 'Content-Type: application/json' \
  -d '{"username":"admin","password":"***"}'

返回:

{"key":"<TOKEN>"}

C.2.2 创建项目

curl -sS -X POST "$CVAT/api/projects" \
  -H "Authorization: Token $TOKEN" \
  -H 'Content-Type: application/json' \
  -d '{
    "name":"traffic-2026q1",
    "labels":[
      {"name":"car","type":"rectangle"},
      {"name":"person","type":"rectangle"}
    ]
  }'

C.2.3 创建任务

curl -sS -X POST "$CVAT/api/tasks" \
  -H "Authorization: Token $TOKEN" \
  -H 'Content-Type: application/json' \
  -d '{
    "name":"cam01-2026-02-01",
    "project_id":123,
    "subset":"train",
    "segment_size":500,
    "overlap":10
  }'

C.2.4 上传数据

curl -sS -X POST "$CVAT/api/tasks/456/data/" \
  -H "Authorization: Token $TOKEN" \
  -F "image_quality=80" \
  -F "use_cache=true" \
  -F "sorting_method=lexicographical" \
  -F "client_files=@./datasets/a.jpg" \
  -F "client_files=@./datasets/b.jpg"

典型返回:

{"rq_id":"action=create&target=task&target_id=456"}

C.2.5 轮询请求状态

curl -sS "$CVAT/api/requests/$RQ_ID" \
  -H "Authorization: Token $TOKEN"

C.2.6 导入注释

curl -sS -X POST "$CVAT/api/tasks/456/annotations?format=COCO%201.0" \
  -H "Authorization: Token $TOKEN" \
  -F "annotation_file=@./annotations/task456.zip"

C.2.7 导出数据集

curl -sS -X POST "$CVAT/api/tasks/456/dataset/export?format=COCO%201.0&save_images=true" \
  -H "Authorization: Token $TOKEN"

C.3 客户端重试策略(建议)

重试分级:

退避策略:

同时加“熔断保护”:短时间失败率过高时暂停新任务,避免雪崩。

C.4 幂等状态表设计(SQL 示例)

CREATE TABLE cvat_job_ledger (
  id BIGSERIAL PRIMARY KEY,
  biz_key TEXT NOT NULL UNIQUE,
  project_name TEXT,
  task_name TEXT,
  task_id BIGINT,
  stage TEXT NOT NULL,
  rq_id TEXT,
  status TEXT NOT NULL,
  error_message TEXT,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

说明:

C.5 审计建议字段

每次 API 调用至少记录:

这样你才能把“业务失败”定位到“具体哪次请求”。

C.6 高可用场景下的调用策略

  1. 使用连接池和 Keep-Alive,减少握手开销。
  2. 上传动作串行或小并发,避免一次打爆 import 队列。
  3. 导出动作可批量排队,但要控总并发。
  4. 长任务轮询间隔不要太短(3~10 秒足够)。

附录 D:标注团队 SOP(可直接落地)

D.1 角色职责矩阵

角色
核心职责
不应做的事

Annotator
按规范标注,提交作业
擅自改标签体系

Reviewer
复核并指出问题
越权修改项目配置

Maintainer
任务配置、成员协同、质量看板
长期使用 owner 账号做日常操作

Owner
组织级治理、风险控制
直接介入一线标注细节

D.2 日常节奏(建议)

班前 10 分钟

班中

班后

D.3 质量抽检策略

建议使用“分层抽检”:

  1. 新人:高抽检(20%~50%)。
  2. 稳定成员:中抽检(5%~15%)。
  3. 关键数据(评测集、上线集):强制全检或共识标注。

抽检维度至少包含:

D.4 返工闭环

返工不是“打回重做”这么简单,必须形成知识闭环:

  1. 记录返工类型(分类错误、漏标、边界偏差、属性缺失)。
  2. 统计每周 top5 错误模式。
  3. 更新规范与示例库。
  4. 下一周验证同类错误是否下降。

D.5 新人培训四阶段

  1. 规范学习(半天):理解标签与属性边界。
  2. 样例实操(1 天):做标准样本并对照答案。
  3. 带教生产(3~5 天):双人复核。
  4. 独立准入:抽检通过率达标后再放量。

D.6 指标看板建议


附录 E:故障演练(GameDay)剧本

你可以每月做一次 60~90 分钟演练,目标是让团队在真实故障发生时“不慌”。

E.1 演练 1:Import Worker 宕机

E.2 演练 2:Redis 不可用

E.3 演练 3:磁盘空间耗尽

E.4 演练 4:错误版本升级

E.5 演练 5:权限误配置

E.6 演练输出模板

每次演练后沉淀:

  1. 发现的问题列表。
  2. 修复优先级与责任人。
  3. SOP 是否更新。
  4. 下次复演时间。

附录 F:容量与成本模型(给管理层)

F.1 容量模型

定义:

估算:

Storage_total ≈ N_d × S_avg × D_ret × K_cache

F.2 人效模型

定义:

等效单位成本可近似:

Cost_per_sample ≈ C_labor × (T_ann + T_rev × (1 + R_rework)) + C_platform

当你把 R_rework 从 20% 拉到 8%,成本下降往往比“盲目提速”更显著。

F.3 ROI 计算思路

自动化改造(脚本化建任务、自动轮询、自动导入导出)通常带来三类收益:

  1. 减少人工操作时间(平台运营同学节省工时)。
  2. 降低失败重跑次数(幂等 + 状态追踪)。
  3. 降低停机风险(可回滚 + 可观测)。

建议每月复盘:


附录 G:上线前最终核对清单(可直接打印)

G.1 架构与部署

G.2 数据与备份

G.3 API 与自动化

G.4 质量与流程

G.5 运维与应急

完成以上 5 类检查,你的 CVAT 系统才算真正达到“可生产”状态。

目录 最新
← 左侧翻上一屏 · 右侧翻下一屏 · 中间唤出菜单