简述
CloudCanal近期实现了MySQL(RDS)到ClickHouse实时同步的能力,功能包含全量数据迁移、增量数据迁移、结构迁移能力,以及附带的监控、告警、HA等能力(平台自带)。
ClickHouse本身并不直接支持Update和Delete能力,但是他自带的MergeTe系列表中CollapsingMergeTe和VersionedCollapsingMergeTe可变相实现实时增量的目的,并且性能完全够用,能够比较轻松达到1kRPS以上的能力。
接下来的文章,简要介绍CloudCanal是如何实现这个能力,以及作为用户我们怎么比较好的使用这个能力。
技术点
结构迁移
CloudCanal默认提供结构迁移,默认选择CollapsingMergeTe作为表引擎,并增加一个默认字段__cc_ck_sign,源主键作为sortKey,如下示例:
CREATETABLEconsole.worker_stats(`id`Int64,`gmt_cate`DateTime,`worker_id`Int64,`cpu_stat`String,`mem_stat`String,`disk_stat`String,`__cc_ck_sign`Int8DEFAULT1)ENGINE=CollapsingMergeTe(__cc_ck_sign)ORDERBYidSETTINGSindex_granularity=
ClickHouse表引擎中,CollapsingMergeTe和VersionedCollapsingMergeTe都能通过标记位按规则折叠数据,从而达到更新和删除的效果。VersionedCollapsingMergeTe相比CollapsingMergeTe优势在于同一条数据的不同变更可以乱序写入,但是CloudCanal选择CollapsingMergeTe主要原因在于2点
CloudCanal中同一条记录必定是按源库变更顺序写入,不存在乱序情况不需要维护VersionedCollapsingMergeTe中的Version字段(版本,也可以起其他名字)
所以CloudCanal选择了CollapsingMergeTe作为默认表引擎。
写数据
CloudCanal写数据主要包含全量和增量两种,即单次搬迁存量数据和长期同步,两者写入略有不同。全量写入对端主要工作是批量和多线程,因为CloudCanal结构迁移默认设置了标记位字段__cc_ck_signdefault值为1,所以就不需要做特殊处理。
对于增量,CloudCanal则需要做3件事情。
转换Update、Delete操作为Insert这一步有两件事情要做,第一件是按照操作类型,填充标记字段值,其中Insert和Update为1,Delete为-1,第二件是将对应增量数据的前镜像或者后镜像填充到结果记录中,以便后续insert写入。
for(CanalRowChangerowChange:rowChanges){switch(rowChange.getEventType()){caseINSERT:{for(CanalRowDatarowData:rowChange.getRowDatasList()){rowData.getAfterColumnsList().add(nonDeleteCol);cords.add(rowData.getAfterColumnsList());}bak;}caseUPDATE:{for(CanalRowDatarowData:rowChange.getRowDatasList()){rowData.getBefoColumnsList().add(deleteCol);cords.add(rowData.getBefoColumnsList());rowData.getAfterColumnsList().add(nonDeleteCol);cords.add(rowData.getAfterColumnsList());}bak;}caseDELETE:{for(CanalRowDatarowData:rowChange.getRowDatasList()){rowData.getBefoColumnsList().add(deleteCol);cords.add(rowData.getBefoColumnsList());}bak;}default:thrownewCanalException("notsupportedeventtype,eventType:"+rowChange.getEventType());}}
按表归组因为IUD操作已全部转换为Insert,且为全镜像(所有字段都填充了值),所以可以按表归组,然后批量写入。即使单线程也能满足大部分场景的同步性能要求。
protectedMapTableUnit,ListCanalRowChangegroupByTable(IncmentMessagemessage){MapTableUnit,ListCanalRowChangedata=newHashMap();for(ParsedEntryentry:message.getEntries()){if(entry.getEntryType()==CanalEntryType.ROWDATA){CanalRowChangerowChange=entry.getRowChange();if(!rowChange.isDdl()){ListCanalRowChangechanges=data.