案例研究|Case Study
从ClickHouse到Apache Doris,腾讯音乐内容库数据平台架构演进实践
腾讯音乐内容库数据平台旨在为应用层提供库存盘点、分群画像、指标分析、标签圈选等内容分析服务,高效为业务赋能。
目前,内容库数据平台的数据架构已经从1.0演进到了4.0,经历了分析引擎从ClickHouse到Apache Doris的替换、经历了数据架构语义层的初步引入到深度应用,有效提高了数据时效性、降低了运维成本、解决了数据管理割裂等问题,收益显著。
本文将为大家分享腾讯音乐内容库数据平台的数据架构演进历程与实践思考,希望所有读者从文章中有所启发。
作者:腾讯音乐内容库数据平台 张俊、代凯
腾讯音乐娱乐集团(简称“腾讯音乐娱乐”)是中国在线音乐娱乐服务开拓者,提供在线音乐和以音乐为核心的社交娱乐两大服务。腾讯音乐娱乐在中国有着广泛的用户基础,拥有目前国内市场知名的四大移动音乐产品:QQ音乐、酷狗音乐、酷我音乐和全民K歌,总月活用户数超过8亿。
业务需求
腾讯音乐娱乐拥有海量的内容曲库,包括录制音乐、现场音乐、音频和视频等多种形式。通过技术和数据的赋能,腾讯音乐娱乐持续创新产品,为用户带来更好的产品体验,提高用户参与度,也为音乐人和合作伙伴在音乐的制作、发行和销售方面提供更大的支持。
在业务运营过程中我们需要对包括歌曲、词曲、专辑、艺人在内的内容对象进行全方位分析,高效为业务赋能,内容库数据平台旨在集成各数据源的数据,整合形成内容数据资产(以指标和标签体系为载体),为应用层提供库存盘点、分群画像、指标分析、标签圈选等内容分析服务。
数据架构的演进历程
TDW是腾讯最大的离线数据处理平台,公司内大多数业务的产品报表、运营分析、数据挖掘等的存储和计算都是在TDW中进行,内容库数据平台的数据加工链路同样是在腾讯数据仓库TDW上构建的。截止目前,内容库数据平台的数据架构已经从1.0演进到了4.0,经历了分析引擎从ClickHouse到Apache Doris的替换、经历了数据架构语义层的初步引入到深度应用,有效提高了数据时效性、降低了运维成本、解决了数据管理割裂等问题,收益显著。接下来将为大家分享腾讯音乐内容库数据平台的数据架构演进历程与实践思考。
数据架构1.0
如图所示为数据架构1.0架构图,分为数仓层、加速层、应用层三部分,数据架构1.0是一个相对主流的架构,简单介绍一下各层的作用及工作原理:
• 数仓层:通过ODS-DWD-DWS三层将数据整合为不同主题的标签和指标体系,DWM集市层围绕内容对象构建大宽表,从不同主题域DWS表中抽取字段。
• 加速层:在数仓中构建的大宽表导入到加速层中,Clickhouse作为分析引擎,Elasticsearch作为搜索/圈选引擎。
• 应用层:根据场景创建DataSet,作为逻辑视图从大宽表选取所需的标签与指标,同时可以二次定义衍生的标签与指标。
存在的问题:
• 数仓层:不支持部分列更新,当上游任一来源表产生延迟,均会造成大宽表延迟,进而导致数据时效性下降。
• 加速层:不同的标签跟指标特性不同、更新频率也各不相同。由于ClickHouse目前更擅长处理宽表场景,无区别将所有数据导入大宽表生成天的分区将造成存储资源的浪费,维护成本也将随之升高。
• 应用层:ClickHouse采用的是计算和存储节点强耦合的架构,架构复杂,组件依赖严重,牵一发而动全身,容易出现集群稳定性问题,对于我们来说,同时维护ClickHouse和Elasticsearch两套引擎的连接与查询,成本和难度都比较高。
除此之外,ClickHouse由国外开源,交流具有一定的语言学习成本,遇到问题无法准确反馈、无法快速获得解决,与社区沟通上的阻塞也是促进我们进行架构升级的因素之一。
数据架构2.0
基于架构1.0存在的问题和ClickHouse的局限性,我们尝试对架构进行优化升级,将分析引擎ClickHouse切换为Doris,Doris具有以下的优势:
Apache Doris的优势:
• Doris架构极简易用,部署只需两个进程,不依赖其他系统,运维简单;兼容MySQL协议,并且使用标准SQL。
• 支持丰富的数据模型,可满足多种数据更新方式,支持部分列更新。
• 支持对Hive、Iceberg、Hudi等数据湖和MySQL、Elasticsearch等数据库的联邦查询分析。
• 导入方式多样,支持从HDFS/S3等远端存储批量导入,也支持读取MySQL Binlog以及订阅消息队列Kafka中的数据,还可以通过Flink Connector实时/批次同步数据源(MySQL,Oracle,PostgreSQL等)到Doris。
• 社区目前Apache Doris社区活跃、技术交流更多,SelectDB针对社区有专职的技术支持团队,在使用过程中遇到问题均能快速得到响应解决。
同时我们也利用Doris的特性,解决了架构1.0中较为突出的问题。
• 数仓层:Apache Doris的Aggregate数据模型可支持部分列实时更新,因此我们去掉了DWM集市层的构建,直接增量到Doris/ES中构建宽表,解决了架构1.0中上游数据更新延迟导致整个宽表延迟的问题,进而提升了数据的时效性。数据(指标、标签等)通过Spark统一离线加载到Kafka中,使用Flink将数据增量更新到Doris和ES中(利用Flink实现进一步的聚合,减轻了Doris和ES的更新压力)。
• 加速层:该层主要将大宽表拆为小宽表,根据更新频率配置不同的分区策略,减小数据冗余带来的存储压力,提高查询吞吐量。Doris具备多表查询和联邦查询性能特性,可以利用多表关联特性实现组合查询。
• 应用层:DataSet统一指向Doris,Doris支持外表查询,利用该特性可对ES引擎直接查询。
架构2.0存在的问题:
• DataSet灵活度较高,数据分析师可对指标和标签自由组合和定义,但是不同的分析师对同一数据的定义不尽相同、定义口径不一致,导致指标和标签缺乏统一管理,这使得数据管理和使用的难度都变高。
• Dataset与物理位置绑定,应用层无法进行透明优化,如果Doris引擎出现负载较高的情况,无法通过降低用户查询避免集群负载过高报错的问题。
数据架构3.0
针对指标和标签定义口径不统一,数据使用和管理难度较高的问题,我们继续对架构进行升级。数据架构3.0主要的变化是引入了专门的语义层,语义层的主要作用是将技术语言转换为业务部门更容易理解的概念,目的是将标签(tag)与指标(metric)变为“一等公民”,作为数据定义与管理的基本对象。
引入语义层的优势有:
• 对于技术来说,应用层不再需要创建DataSet,从语义层可直接获取特定内容对象的标签集(tagset)和指标集(metricset)来发起查询。
• 对于数据分析师来说,可统一在语义层定义和创建衍生的指标和标签,解决了定义口径不一致、管理和使用难度较高的问题。
• 对于业务来说,无需耗费过长时间考虑什么场景应选择哪个数据集使用,语义层对标签和指标透明统一的定义提升了工作效率、降低了使用成本。
存在的问题:
从架构图可知,标签和指标等数据均处于下游位置,虽然标签与指标在语义层被显式定义,但仍然无法影响上游链路,数仓层有自己的语义逻辑,加速层有自己的导入配置,这样就造成了数据管理机制的割裂。
数据架构4.0
在数据架构3.0的基础上,我们对语义层进行更深层次的应用,在数据架构4.0中,我们将语义层变为架构的中枢节点,目标是对所有的指标和标签统一定义,从计算-加速-查询实现中心化、标准化管理,解决数据管理机制割裂的问题。
语义层作为架构中枢节点所带来的变化:
• 数仓层:语义层接收SQL触发计算或查询任务。数仓从DWD到DWS的计算逻辑将在语义层中进行定义,且以单个指标和标签的形式进行定义,之后由语义层来发送命令,生成SQL命令给数仓层执行计算。
• 加速层:从语义层接收配置、触发导入任务,比如加速哪些指标与标签均由语义层指导。
• 应用层:向语义层发起逻辑查询,由语义层选择引擎,生成物理SQL。
架构优势:
• 可以形成统一视图,对于核心指标和标签的定义进行统一查看及管理。
• 应用层与物理引擎完成解耦,可进一步对更加灵活易用的架构进行探索:如何对相关指标和标签进行加速,如何在时效性和集群的稳定性之间平衡等。
存在的问题:
因为当前架构是对单个标签和指标进行了定义,因此如何在查询计算时自动生成一个准确有效的SQL语句是非常有难度的。如果你有相关的经验,期待有机会可以一起探索交流。
优化经验
从上文已知,为更好地实现业务需求,数据架构演进到4.0版本,其中Apache Doris作为分析加速场景的解决方案在整个系统中发挥着重要的作用。接下来将从场景需求、数据导入、查询优化以及成本优化四个方面出发,分享基于Doris的读写优化经验,希望给读者带来一些参考。
场景需求
目前我们有800+标签,1300+指标,对应TDW中有80+Source表,单个标签、指标的最大基数达到了2亿+。我们希望将这些数据从TDW加速到Doris中完成标签画像和指标的分析。从业务的角度,需要满足以下要求:
• 实时可用:标签/指标导入以后,需实现数据尽快可用。不仅要支持常规离线导入T+1,同时也要支持实时打标场景。
• 部分更新:因每个Source表由各自ETL任务产出对应的数据,其产出时间不一致,并且每个表只涉及部分指标或标签,不同数据查询对时效性要求也不同,因此架构需要支持部分列更新。
• 性能高效:具备高效的写入能力,且在圈选、洞察、报表等场景可以实现秒级响应。
• 控制成本:在满足业务需求的前提下,最大程度地降低成本;支持冷热数据精细化管理,支持标签灵活上下架。
数据导入方案
为了减轻Doris写入压力,我们考虑在数据写入Doris之前,尽量将数据生成宽表,再写入到Doris中。针对宽表的生成,我们有两个实现思路:第一个是在TDW数仓中生成宽表;第二个是Flink中生成宽表。我们对这两个实现思路进行了实践对比,最终决定选择第二个实现思路,原因如下:
在TDW中生成宽表,虽然链路简单,但是弊端也比较明显。
• 存储成本较高,TDW除了要维护离散的80+个Source表外,还需维护1个大宽表、2份冗余的数据。
• 实时性比较差,由于每个Source表产出的时间不一样,往往会因为某些延迟比较大的Source表导致整个数据链路延迟增大。
• 开发成本较高,该方案只能作为离线方式,若想实现实时方式则需要投入开发资源进行额外的开发。
而在Flink中生成宽表,链路简单、成本低也容易实现,主要流程是:首先用Spark将相关Source表最新数据离线导入到Kafka中, 接着使用Flink来消费Kafka,并通过主键ID构建出一张大宽表,最后将大宽表导入到Doris中。如下图所示,来自数仓N个表中ID=1的5条数据,经过Flink处理以后,只有一条ID=1的数据写入Doris中,大大减少Doris写入压力。
通过以上导入优化方案,极大地降低了存储成本,TDW无需维护两份冗余的数据,Kafka也只需保存最新待导入的数据。同时该方案整体实时性更好且可控,并且大宽表聚合在Flink中执行,可灵活加入各种ETL逻辑,离线和实时可对多个开发逻辑进行复用,灵活度较高。
数据模型选择
目前我们生产环境所使用的版本为Apache Doris 1.1.3,我们对其所支持的Unique主键模型、Aggregate聚合模型和Duplicate明细模型进行了对比,相较于Unique模型和Duplicate模型,Aggregate聚合模型满足我们部分列更新的场景需求:
Aggregate聚合模型可以支持多种预聚合模式,可以通过REPLACE_IF_NOT_NULL的方式实现部分列更新。数据写入过程中,Doris会将多次写入的数据进行聚合,最终用户查询时,返回一份聚合后的完整且正确的数据。
另外两种数据模型适用的场景,这里也进行简单的介绍:
• Unique模型适用于需要保证Key唯一性场景,同一个主键ID多次导入之后,会以append的方式进行行级数据更新,仅保留最后一次导入的数据。在与社区进行沟通后,确定后续版本Unique模型也将支持部分列更新。
• Duplicate模型区别于Aggregate和Unique模型,数据完全按照导入的明细数据进行存储,不会有任何预聚合或去重操作,即使两行数据完全相同也都会保留,因此Duplicate模型适用于既没有聚合需求,又没有主键唯一性约束的原始数据存储。
确定数据模型之后,我们在建表时如何对列进行命名呢?可以直接使用指标或者是标签的名称吗?
在使用场景中通常会有以下几个需求:
• 为了更好地表达数据的意义,业务方会有少量修改标签、指标名称的需求。
• 随着业务需求的变动,标签经常存在上架、下架的情况。
• 实时新增的标签和指标,用户希望数据尽快可用。
Doris 1.1.3是不支持对列名进行修改的,如果直接使用指标/标签名称作为列名,则无法满足上述标签或指标更名的需求。而对于上下架标签的需求,如果直接以drop/addcolumn的方式实现,则会涉及数据文件的更改,该操作耗时耗力,甚至会影响线上查询的性能。
那么,有没有更轻量级的方式来满足需求呢?接下来将为大家分享相关解决方案及收益:
• 为了实现少量标签、指标名称修改,我们用MySQL表存储相应的元数据,包括名称、全局唯一的ID和上下架状态等信息,比如标签歌曲名称song_name的ID为4,在Doris中存储命名为a4,用户使用更具有业务含义song_name进行查询。在查询Doris前,我们会在查询层将SQL改写成具体的列名a4。这样名称的修改只是修改其元数据,底层Doris的表结构可以保持不变。
• 为了实现标签灵活上下架,我们通过统计标签的使用情况来分析标签的价值,将低价值的标签进入下架流程。下架指的是对元信息进行状态标注,在下架标签重新上架之前,不会继续导入其数据,元信息中数据可用时间也不会发生变化。
• 对于实时新增标签/指标,我们基于名称ID的映射在Doris表中预先创建适量ID列,当标签/指标完成元信息录入后,直接将预留的ID分配给新录入的标签/指标,避免在查询高峰期因新增标签/指标所引起的Schema Change开销对集群产生的影响。
经测试,用户在元信息录入后10分钟内就可以使用相应的数据。
值得关注的是,在社区近期发布的1.2.0版本中,增加了Light Schema Change功能,对于增减列的操作不需要修改数据文件,只需要修改FE中的元数据,从而可以实现毫秒级的Schame Change操作。同时开启Light Schema Change功能的数据表也可以支持列名的修改,这与我们的需求十分匹配,后续我们也会及时升级到最新版本。
写入优化
接着我们在数据写入方面也进行了调整优化,这里几点小经验与大家分享:
• Flink预聚合:通过主键ID预聚合,减少写入压力。(前文已说明,此处不再赘述)
• 写入Batch大小自适应变更:为了不占用过多Flink资源,我们实现了从同一个Kafka Topic中消费数据写入到不同Doris表中的功能,并且可以根据数据的大小自动调整写入的批次,尽量做到攒批低频写入。
• Doris写入调优:针对-235报错进行相关参数的调优。比如设置合理的分区和分桶(Tablet建议1-10G),同时结合场景对Compaction参数调优:
max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas
• 优化BE提交逻辑:定期缓存BE列表,按批次随机提交到BE节点,细化负载均衡粒度。
优化背景:在写入时发现某一个BE负载会远远高于其他的BE,甚至出现OOM。结合源码发现:作业启动后会获取一次BE地址列表,从中随机选出一个BE作为Coordinator协调者,该节点主要负责接收数据、并分发到其他的BE节点,除非作业异常报错,否则该节点不会发生切换。
对于少量Flink作业大数据场景会导致选中的BE节点负载较高,因此我们尝试对BE提交逻辑进行优化,设置每1小时缓存一次BE列表,每写入一个批次都随机从BE缓存列表中获取一个进行提交,这样负载均衡的粒度就从job级别细化到每次提交的批次,使得BE间负载更加的均衡,这部分实现我们已经贡献到社区,欢迎大家一起使用并反馈。
• https://github.com/apache/doris-spark-connector/pull/59
通过以上数据导入的优化措施,使得整体导入链路更加稳定,每日离线导入时长下降了75%,数据版本累积情况也有所改善,其中cumu compaction的合并分数更是从600+直降到100左右,优化效果十分明显。
查询优化
目前我们的场景指标数据是以分区表的形式存储在Doris中,ES保留一份全量的标签数据。在我们的使用场景中,标签圈选的使用率很高,大约有60%的使用场景中用到了标签圈选,在标签圈选场景中,通常需要满足以下几个要求:
• 用户圈选逻辑比较复杂,数据架构需要支持同时有上百个标签做圈选过滤条件。
• 大部分圈选场景只需要最新标签数据,但是在指标查询时需要支持历史的数据的查询。
• 基于圈选结果,需要进行指标数据的聚合分析。
• 基于圈选结果,需要支持标签和指标的明细查询。
经过调研,我们最终采用了Doris on ES的解决方案来实现以上要求,将Doris的分布式查询规划能力和ES的全文检索能力相结合。Doris on ES主要查询模式如下所示:
SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag
在ES中圈选查询出的ID数据,以子查询方式在Doris中进行指标分析。
我们在实践中发现,查询时长跟圈选的群体大小相关。如果从ES中圈选的群体规模超过100万时,查询时长会达到60秒,圈选群体再次增大甚至会出现超时报错。经排查分析,主要的耗时包括两方面:
• BE从ES中拉取数据(默认一次拉取1024行),对于100万以上的群体,网络IO开销会很大。
• BE数据拉取完成以后,需要和本地的指标表做Join,一般以SHUFFLE/BROADCAST的方式,成本较高。
针对这两点,我们进行了以下优化:
• 增加了查询会话变量es_optimize,以开启优化开关;
• 数据写入ES时,新增BK列用来存储主键ID Hash后的分桶序号,算法和Doris的分桶算法相同(CRC32);
• BE生成Bucket Join执行计划,将分桶序号下发到BE ScanNode节点,并下推到ES;
• ES对查询出的数据进行Bitmap压缩,并将数据的多批次获取优化为一次获取,减少网络IO开销;
• Doris BE只拉取和本地Doris指标表相关Bucket的数据,直接进行本地Join,避免Doris BE间数据再Shuffle的过程。
通过以上优化措施,百万分群圈选洞察查询时间从最初的60秒缩短到3.7秒,性能显著提升!
经过与社区沟通交流,Apache Doris从2.0.0版本开始,将支持倒排索引。可进行文本类型的全文检索;支持中文、英文分词;支持文本、数值日期类型的等值和范围过滤;倒排索引对数组类型也提供了支持,多个过滤条件可以任意进行AND OR NOT逻辑组合。由于高性能的向量化实现和面向AP数据库的精简优化,Doris的倒排索引相较于ES会有3~5倍性价比提升,即将在2月底发布的2.0 preview版本中可用于功能评估和性能测试,相信在这个场景使用后会有进一步的性能提升。
成本优化
在当前大环境下,降本提效成为了企业的热门话题,如何在保证服务质量的同时降低成本开销,是我们一直在思考的问题。在我们的场景中,成本优化主要得益于Doris自身优秀的能力,这里为大家分享两点:
1、冷热数据进行精细化管理。
• 利用Doris TTL机制,在Doris中只存储近一年的数据,更早的数据放到存储代价更低的TDW中;
• 支持分区级副本设置,3个月以内的数据高频使用,分区设置为3副本;3-6个月数据分区调整为2副本;6个月之前的数据分区调整为1副本;
• 支持数据转冷,在SSD中仅存储最近7天的数据,并将7天之前的数据转存到到HDD中,以降低存储成本;
• 标签上下线,将低价值标签和指标下线处理后,后续数据不再写入,减少写入和存储代价。
2、降低数据链路成本。
Doris架构非常简单,只有FE和BE两类进程,不依赖其他组件,并通过一致性协议来保证服务的高可用和数据的高可靠,自动故障修复,运维起来比较容易;
• 高度兼容MySQL语法,支持标准SQL,极大降低开发人员接入使用成本;
• 支持多种联邦查询方式,支持对Hive、MySQL、Elasticsearch、Iceberg等组件的联邦查询分析,降低多数据源查询复杂度。
通过以上的方式,使得存储成本降低42%,开发与时间成本降低了40%,成功实现降本提效,后续我们将继续探索!
未来规划
未来我们还将继续进行迭代和优化,我们计划在以下几个方向进行探索:
• 实现自动识别冷热数据,用Apache Doris存储热数据,Iceberg存储冷数据,利用Doris湖仓一体化能力简化查询。
• 对高频出现的标签/指标组合,通过Doris的物化视图进行预计算,提升查询的性能。
• 探索Doris应用于数仓计算任务,利用物化视图简化代码逻辑,并提升核心数据的时效性。
最后,感谢Apache Doris社区和SelectDB的同学,感谢其快速响应和积极支持,未来我们也会持续将相关成果贡献到社区,希望Apache Doris飞速发展,越来越好!