MySQL到ClickHouse实时同步

北京根治皮炎医院 http://baidianfeng.39.net/a_yufang/210112/8582670.html

简述

CloudCanal近期实现了MySQL(RDS)到ClickHouse实时同步的能力,功能包含全量数据迁移、增量数据迁移、结构迁移能力,以及附带的监控、告警、HA等能力(平台自带)。

ClickHouse本身并不直接支持Update和Delete能力,但是他自带的MergeTe系列表中CollapsingMergeTe和VersionedCollapsingMergeTe可变相实现实时增量的目的,并且性能完全够用,能够比较轻松达到1kRPS以上的能力。

接下来的文章,简要介绍CloudCanal是如何实现这个能力,以及作为用户我们怎么比较好的使用这个能力。

技术点

结构迁移

CloudCanal默认提供结构迁移,默认选择CollapsingMergeTe作为表引擎,并增加一个默认字段__cc_ck_sign,源主键作为sortKey,如下示例:

CREATETABLEconsole.worker_stats(`id`Int64,`gmt_cate`DateTime,`worker_id`Int64,`cpu_stat`String,`mem_stat`String,`disk_stat`String,`__cc_ck_sign`Int8DEFAULT1)ENGINE=CollapsingMergeTe(__cc_ck_sign)ORDERBYidSETTINGSindex_granularity=

ClickHouse表引擎中,CollapsingMergeTe和VersionedCollapsingMergeTe都能通过标记位按规则折叠数据,从而达到更新和删除的效果。VersionedCollapsingMergeTe相比CollapsingMergeTe优势在于同一条数据的不同变更可以乱序写入,但是CloudCanal选择CollapsingMergeTe主要原因在于2点

CloudCanal中同一条记录必定是按源库变更顺序写入,不存在乱序情况不需要维护VersionedCollapsingMergeTe中的Version字段(版本,也可以起其他名字)

所以CloudCanal选择了CollapsingMergeTe作为默认表引擎。

写数据

CloudCanal写数据主要包含全量和增量两种,即单次搬迁存量数据和长期同步,两者写入略有不同。全量写入对端主要工作是批量和多线程,因为CloudCanal结构迁移默认设置了标记位字段__cc_ck_signdefault值为1,所以就不需要做特殊处理。

对于增量,CloudCanal则需要做3件事情。

转换Update、Delete操作为Insert这一步有两件事情要做,第一件是按照操作类型,填充标记字段值,其中Insert和Update为1,Delete为-1,第二件是将对应增量数据的前镜像或者后镜像填充到结果记录中,以便后续insert写入。

for(CanalRowChangerowChange:rowChanges){switch(rowChange.getEventType()){caseINSERT:{for(CanalRowDatarowData:rowChange.getRowDatasList()){rowData.getAfterColumnsList().add(nonDeleteCol);cords.add(rowData.getAfterColumnsList());}bak;}caseUPDATE:{for(CanalRowDatarowData:rowChange.getRowDatasList()){rowData.getBefoColumnsList().add(deleteCol);cords.add(rowData.getBefoColumnsList());rowData.getAfterColumnsList().add(nonDeleteCol);cords.add(rowData.getAfterColumnsList());}bak;}caseDELETE:{for(CanalRowDatarowData:rowChange.getRowDatasList()){rowData.getBefoColumnsList().add(deleteCol);cords.add(rowData.getBefoColumnsList());}bak;}default:thrownewCanalException("notsupportedeventtype,eventType:"+rowChange.getEventType());}}

按表归组因为IUD操作已全部转换为Insert,且为全镜像(所有字段都填充了值),所以可以按表归组,然后批量写入。即使单线程也能满足大部分场景的同步性能要求。

protectedMapTableUnit,ListCanalRowChangegroupByTable(IncmentMessagemessage){MapTableUnit,ListCanalRowChangedata=newHashMap();for(ParsedEntryentry:message.getEntries()){if(entry.getEntryType()==CanalEntryType.ROWDATA){CanalRowChangerowChange=entry.getRowChange();if(!rowChange.isDdl()){ListCanalRowChangechanges=data.


转载请注明:http://www.aierlanlan.com/rzgz/2115.html

  • 上一篇文章:
  •   
  • 下一篇文章: