小白理解 Apache Kylin 到源码学习

小白理解 Apache Kylin 到源码学习

用户视角

你是否面对海量数据查询缓慢而备受煎熬 ?受限于查询速度而只能屈服于数据量?想在同一个分析工具下分析不同来源的数据?想在不同的 BI 工具下仍具有敏捷的查询响应速度?在 Apache Kylin 面前这些都不是问题,一图胜千言。

img

好了,你可能想说:“前面的问题你倒是快给我解决啊,这图我表示看不懂”。别急,“工欲善其事,必先利其器”,对于基本的概念我们得了解,方能压住这“麒麟”神兽。

根据文章开头提出的系列问题能了解到最基本的一点,Apache Kylin 能够解决海量数据查询的问题。既然要查询,首先我们需要有个查询的入口,Kylin 提供了标准 SQL 作为对外查询的接口,你只需要和平时写 SQL 一样进行数据查询即可,只是换了个查询的平台,不用关心如何解决查询缓慢的问题,因为 Kylin 在背后帮你解决了这个问题。

“既然不用我了解如何做到海量数据亚秒级查询的,那总得让我明白怎么把需要查询的数据给 Kylin 吧?”

这个好说,最早期 Kylin 只支持 Hive 作为数据源,数据源理解为你需要查询的真实数据,如今 Kylin 不仅支持 Kafka 消息流作为数据源,还支持 RDBMS 等数据源,比如 Mysql,下面拿 Hive 举例讲一讲数据源。

Hive 本身的数据源不强制使用特定文件格式,你可以将 csv 文件作为 Hive 数据的来源,将 csv 文件中的数据存储到 Hive 中,尽管此时你可以直接通过 Hive 查询数据,但面对海量数据效率是很低的。于是在 Kylin 中我们仅仅将 Hive 作为数据存储的介质,提供数据的来源,随后我们会采用一种叫预计算的方式将要查询的数据存储到 Cube 中,Kylin 查询 Cube 中已经计算好的数据。

以上对于整个 Kylin 有个最浅的认识,一句话概括就是只要我们提供数据源,Kylin 便可以实现海量数据的秒级甚至亚秒级查询,飞一般的感觉。那么它究竟是如何做到的呢?

原理理解

Kylin 能够实现亚秒级查询,是运用了预计算这个概念,预计算字面上容易理解,就是提前计算好,查询时直接返回结果。 对于分析师而言,明细数据基本上用不着,而是需要根据业务逻辑聚合之后的数据。比如商家需要根据某个月份商品的销量调整库存,此时只需要知道该月份该商品的总销量即可,对于每天的销量数据在这种业务场景下没有利用价值。此时便可以利用预计算的思想,提前将该月份该商品总销量计算出来,当业务分析师进行查询时,直接返回计算好的结果即可,这就是预计算的简单理解。

在 Kylin 中,应对的不是这种简单的计算场景,而是对海量数据,多维度的计算。既然想要很好的利用预计算的思想,那如何让 Kylin 提前准备好这些计算后的结果呢?此时就需要建模,通过建模告诉 Kylin 你想要根据哪些维度计算出什么值。Kylin 根据定义好的模型,按照不同的维度组合进行计算,相当于业务分析师从数据的不同角度提出了不同的计算需求,Kylin 将各种维度组合计算出来,每一个计算结果保存成一个物化视图,称为 Cuboid,所有维度计算的结果组合在一起,也就是所有 Cuboid 作为一个整体,称为 Cube。

根据各种维度聚合之后计算的值已经存在 Cube 中了,那么分析师输入的 SQL 便不需要查数据源中的数据,直接查询 Cube 中计算好的数据即可,当然查询 Cube 还需要将 SQL 查询转化成 Cube 的查询,这个过程称为后计算。此时可以再看一遍文章开头的架构图,用户输入 SQL,通过各种 Rest API、JDBC/ODBC 接口,来到 Rest 服务层,再转交给查询引擎,查询引擎会解析 SQL,生成基于关系表的逻辑执行计划,然后将其转译为基于 Cube 的物理执行计划,最后查询预计算生成的 Cube 并产生结果。关系表是数据的输入形式,符合星型模型或雪花模型,简单理解为一种关系模型就好了。

你可能会产生疑问,一直提到 Cube,感觉它只是一个抽象的概念,那它到底存在哪呢?Cube 的存储实际上由存储引擎实现,Kylin 将其存储在 HBase 中。Kylin 将存储引擎进行了抽象,可以使用不同的存储。就如上文提到的数据源,它也经过抽象,可以实现不同的数据源,简单类比为 Java 中的接口可以有不同的实现类。

从问题层层递进

“Cube 是如何生成的呢?”

上面我们提到 Cube 是 Cuboid 的组合,Cuboid 通过维度组合计算出来,重点是如何计算每个 Cuboid 维度组合的值,当没有值时,Cube 其实就相当于是一个骨架,没有肉。而 Cube 构建引擎正是要填充这个骨架,也就是将数据源中的数据根据维度聚合,计算出相应的值,填充到对应的 Cuboid,Cube 采用分层构建,先计算出 Base Cuboid,也就是包含所有维度的一个 Cuboid,再逐层构建,最后存储到 HBase 中。实际上 Cube 构建远不止这么简单,中间还有很多原理和细节,并且 Cube 构建算法不仅有逐层构建,还有 Fast Cubing 算法,需要进一步探索。

“如果我又新产生了数据该怎么办呢?再重复上一次的过程,加载所有数据进行 Cube 构建吗?”

Cube 有两种构建方式,全量构建和增量构建。从字面也比较容易理解,全量就是读取数据源中的所有数据,增量就是读取数据源中相对于某一个时间段新产生的数据,也就是每次构建只读取一个时间范围的数据。这里又产生一个新的概念,叫 Segment,可以理解为根据时间段进行构建的小 Cube,Segment 可以根据配置以一定的方式进行合并,比如每满两个月的数据进行一次合并,全量构建实际上就是只有一个 Segment,它没有根据时间进行分割。由此可以看出全量构建通常适用于事实表的数据不随时间增长或事实表的数据比较小、更新频率很低的场景,只有在这样的场景下,全量构建才不会造成大开销。而现实业务中,数据会不断增加,增量构建才是应用最广泛构建方式。

“Cube 中一定能满足我所有查询的情况吗?”

当然不是,Cube 根据维度覆盖到的是大多数查询情况,但并非所有,所以对于 Cube 中查询不到的时候,Kylin 也能够查询数据源中的数据,这个概念就叫查询下压,此时查询的速度会较慢。

“为什么预计算有这么明显的优势,MPP 大规模并行处理和列式存储难道不能解决这些问题吗?”

还真不能。MPP 并行处理是通过增加物理资源,提高并行计算能力,在机器一定的情况下,当数据量不断增加,计算时间也将线性增加,为了计算海量数据不断增加大量机器那我岂不是得亏本。列式存储是将数据记录按列存放,可以提高读取的效率,但查询性能与数据量呈线性相关仍然无法改变。通过上面的了解你已经知道,Kylin 通过预计算的方式打破查询时间随数据量呈线性增长的规律,这就是 Kylin 关键的思想,通过预计算,用空间换时间。你或许听过交互式分析这个词,如果只有 MPP 和列式存储,面对海量数据无法实现交互式,Kylin 则解决了这个问题,你再也不用点击查询再去喝一杯咖啡了。

通过以上对 Kylin 的基本了解,再看看文章开头的图,相信你会有新的理解。我们来总结一下对 Apache Kylin 的认识,Kylin 通过预计算思想,利用大数据计算引擎,将数据源中的海量数据根据维度进行计算,并按一定格式存储到 HBase 中,解决海量数据场景下,查询速度不随数据量增长而线性增长的问题。

进一步探索 Cube 构建

接下来我们着重了解下 Cube 构建的过程。通过以上内容的学习,我们了解到,Cube 构建实际上是将 Hive 表中的数据转化成 HBase 的存储结构,也就是 Key-Value 键值对的形式,HBase 中存储的数据是依据维度计算好的结果。下面学习下 Cube 构建的详细流程。

  1. Create Intermediate Flat Hive Table(创建大平表)

当我们提交 Cube 构建任务时,Kylin 会创建一张临时的 Hive 平表,根据模型中设置的列,从 Hive 数据源中抽取出来并插入到平表中,后续的构建就基于这张中间表进行。通过 hive cli 查询 hive 表可以看到,当我们点击构建后,多了一张名为 kylin_intermediate 开头的中间表。
hive 表
生成中间表

  1. Redistribute Flat Hive Table(重分布数据)

在第一步会将抽出来形成中间表的数据存储在 hdfs 上,这些数据文件大小不均匀,后续 Map 任务处理时间长短会不一致,通过 Hive 的 INSERT INTO … DISTRIBUTE BY 重分布语句将处理数据的任务均衡。

重分布数据

  1. Extract Fact Table Distinct Columns(获取不同维度的值)
    获取不同维度的值
    获取不同维度的值
    ​ 这个步骤将启动 MapReduce 任务对中间表中每个维度进行抽取,用于下一步构建维度字典。

  2. Build Dimension Dictionary(构建维度字典)

通过上一步的维度值构建字典,就是将维度值映射成编码,可以节约存储资源,HBase 的 RowKey 中就不需要使用维度值了,直接使用其对应的编码就可以。

  1. Save Cuboid Statistics(保存 Cuboid 数据)

保存 Cuboid 的相关统计信息。

  1. Create HTable(创建 HTable)

创建 HBase 表用于保存之后要生成的 Cube 数据。

  1. Build Base Cuboid(构建 Base Cuboid)

计算 Base Cuboid(所有维度的组合)

  1. Build N-Dimension Cuboid : level 1… N(构建每一层 Cuboid)

计算除 Base Cuboid 之外的 Cuboid。

  1. Build Cube In-Mem

上面两步是逐层构建 Cuboid,这一步是 In-Mem 快速构建,对内存消耗较高,构建速度更快,在具体执行时会自动选择一种构建方式,没被选中的方式会自动跳过。

通过查看任务详情,我在本例中该步骤被跳过,使用的是逐层构建的方式。
Build Cube In-Mem

  1. Convert Cuboid Data to HFile

HFile 是 HBase 持久化的存储文件,也就是 HBase 存储数据的文件形式。这一步会将构建好的 Cuboid 转换成 HFile。

  1. Load HFile to HBase Table

将 HFile 转成 HBase 中的数据。

  1. Update Cube Info

更新 Cube 相关信息,此时 Cube 已经准备好,更改 Cube 状态后可进行查询。

  1. Hive Cleanup

清理 Hive 中间表,通过查看 Hive 的表,发现第一步生成的中间表已经被删除了。
Hive Cleanup

  1. Garbage Collection on HBase

清理元数据信息中的临时数据,比如第 3 步中统计的不同维度,已经构建好字典了,这些数据不再需要使用,从 output 中可以看到删除掉的相关元数据。
Garbage Collection

学习 Cube 构建过程的源码

通过页面点击进行构建,查看请求 api,查看 Kylin 源码,可以看到从前端页面点击构建请求,最终调用了 CubeControllerrebuild 函数,返回一个 JobInstance 实例
request

/**
 * Build/Rebuild a cube segment
 */
@RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }, produces = { "application/json" })
@ResponseBody
public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) {
    return buildInternal(cubeName, new TSRange(req.getStartTime(), req.getEndTime()), null, null, null,
            req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment(), req.getPriorityOffset());
}

上一步仅仅是接收请求,获取部分参数。

private JobInstance buildInternal(String cubeName, TSRange tsRange, SegmentRange segRange, //
        Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
        String buildType, boolean force, Integer priorityOffset) {
    try {
       //Returns the name of this principal,获取提交该任务的用户
        String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
        // 根据 Cube 名称获取 Cube 实例
        CubeInstance cube = jobService.getCubeManager().getCube(cubeName);

        // 检测 Cube 构建的任务数量,默认是 10 个,如果超过了则抛出异常,最终调用了以下配置获得最大构建任务数
        //public int getMaxBuildingSegments() {
        //  return Integer.parseInt(getOptional("kylin.cube.max-building-segments", "10"));
    //    }
        checkBuildingSegment(cube);

        //提交 Cube 构建任务
        return jobService.submitJob(cube, tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd,
                CubeBuildTypeEnum.valueOf(buildType), force, submitter, priorityOffset);
    } catch (Throwable e) {
        logger.error(e.getLocalizedMessage(), e);
        throw new InternalErrorException(e.getLocalizedMessage(), e);
    }
}

查看 submitJob 方法。

public JobInstance submitJob(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
        Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd,
        CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
    //通过 Cube 实例进一步获取 Project 实例,通过封装好的权限检查工具 AclUtil 进行权限检查。
    aclEvaluate.checkProjectOperationPermission(cube);
    // 提交任务
    JobInstance jobInstance = submitJobInternal(cube, tsRange, segRange, sourcePartitionOffsetStart,
            sourcePartitionOffsetEnd, buildType, force, submitter, priorityOffset);

    return jobInstance;
}

查看 submitJobInternal 方法。

public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, //
        Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, //
        CubeBuildTypeEnum buildType, boolean force, String submitter, Integer priorityOffset) throws IOException {
    Message msg = MsgPicker.getMsg();

    // 检测 Cube 的状态
    if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
        throw new BadRequestException(String.format(Locale.ROOT, msg.getBUILD_BROKEN_CUBE(), cube.getName()));
    }

    // 根据 Cube 的元信息,进行签名校验,将已有签名与计算出来的进行比较,判断 Cube 信息的正确性
    checkCubeDescSignature(cube);
    // 检测 Cube 是否符合构建状态,进一步查看源码可知如果已经有处于 pending 状态的 segment,则该 Cube 不能进行构建,抛出异常
    checkAllowBuilding(cube);
    // 检测是否能并行构建
    if (buildType == CubeBuildTypeEnum.BUILD || buildType == CubeBuildTypeEnum.REFRESH) {
        checkAllowParallelBuilding(cube);
    }

    DefaultChainedExecutable job;

    CubeSegment newSeg = null;
    try {
        if (buildType == CubeBuildTypeEnum.BUILD) {
            // 获取数据源类型,目前支持 Hive,JDBC,Kafka
            ISource source = SourceManager.getSource(cube);
            // 获得构建范围
            SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart,
                    sourcePartitionOffsetEnd);
            // 在此填充数据,个人理解为对于实时数据,在执行完前面的步骤后又产生了时间间隔,将这部分数据加载进来,并添加一个 segment
            src = source.enrichSourcePartitionBeforeBuild(cube, src);
            newSeg = getCubeManager().appendSegment(cube, src);
            // 通过构建引擎生成新的 job 任务
            job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
        } else if (buildType == CubeBuildTypeEnum.MERGE) {
            newSeg = getCubeManager().mergeSegments(cube, tsRange, segRange, force);
            job = EngineFactory.createBatchMergeJob(newSeg, submitter);
        } else if (buildType == CubeBuildTypeEnum.REFRESH) {
            newSeg = getCubeManager().refreshSegment(cube, tsRange, segRange);
            job = EngineFactory.createBatchCubingJob(newSeg, submitter, priorityOffset);
        } else {
            throw new BadRequestException(String.format(Locale.ROOT, msg.getINVALID_BUILD_TYPE(), buildType));
        }

        // 将任务添加到任务调度系统
        getExecutableManager().addJob(job);

    } catch (Exception e) {
        if (newSeg != null) {
            logger.error("Job submission might failed for NEW segment {}, will clean the NEW segment from cube",
                    newSeg.getName());
            try {
                // Remove this segment
                getCubeManager().updateCubeDropSegments(cube, newSeg);
            } catch (Exception ee) {
                // swallow the exception
                logger.error("Clean New segment failed, ignoring it", e);
            }
        }
        throw e;
    }

    // 返回任务实例信息
    JobInstance jobInstance = getSingleJobInstance(job);

    return jobInstance;
}

查看创建 job 任务 createBatchCubingJob 方法,最终会根据构建引擎,创建相应的 builder 实例并调用 build 方法。

// 该方法用于返回构建引擎,根据下图中可看出支持 MR 和 Spark
public static IBatchCubingEngine batchEngine(IEngineAware aware) {
        ImplementationSwitch<IBatchCubingEngine> current = engines.get();
        if (current == null) {
            current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getJobEngines(),
                    IBatchCubingEngine.class);
            engines.set(current);
        }
        return current.get(aware.getEngineType());
}

// 这里是先根据配置获得构建引擎,创建对应的 builder 实例并调用 build 方法。
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
    return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter, priorityOffset);
}

implementation

先来看看 MR 构建引擎的 build 方法

public CubingJob build() {
    logger.info("MR_V2 new job to BUILD segment " + seg);

    // 获得一个初始化的 Job 实例
    final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
    final String jobId = result.getId();
    // 获取 cuboid 的数据路径,以配置的 working-dir 开头
    final String cuboidRootPath = getCuboidRootPath(jobId);

    // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
    inputSide.addStepPhase1_CreateFlatTable(result);

    // Phase 2: Build Dictionary
    result.addTask(createFactDistinctColumnsStep(jobId));

    // 判断是否是高基维(UHC),如果是则添加新的任务对高基维进行处理
    if (isEnableUHCDictStep()) {
        result.addTask(createBuildUHCDictStep(jobId));
    }

    // 构建字典
    result.addTask(createBuildDictionaryStep(jobId));

    // 保存 cuboid 统计数据
    result.addTask(createSaveStatisticsStep(jobId));

    // add materialize lookup tables if needed
    LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);

    // 创建 HTable
    outputSide.addStepPhase2_BuildDictionary(result);

    if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
        result.addTask(createExtractDictionaryFromGlobalJob(jobId));
    }

    // Phase 3: Build Cube
    addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
    addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
    outputSide.addStepPhase3_BuildCube(result);

    // Phase 4: Update Metadata & Cleanup
    result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
    inputSide.addStepPhase4_Cleanup(result);
    outputSide.addStepPhase4_Cleanup(result);

    // Set the task priority if specified
    result.setPriorityBasedOnPriorityOffset(priorityOffset);
    result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));

    return result;
}

从源码中可以看到,该流程与页面上 Cube 构建的任务流程基本一致,并且在 Phase 3:Build Cube 处有两种构建算法,只有被选中的算法才会被最终执行。

再看看 Spark 的 build 方法实现,Spark 的 build 方法流程和 MR 的基本上一致,不再做注释,但构建算法处有一些不同,Spark 构建引擎中只使用了分层构建算法,至于这两种算法的具体原理以及在两种构建引擎中选用的区别,将在之后的文章中做进一步探讨。

public CubingJob build() {
    logger.info("Spark new job to BUILD segment " + seg);

    final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
    final String jobId = result.getId();
    final String cuboidRootPath = getCuboidRootPath(jobId);

    // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
    inputSide.addStepPhase1_CreateFlatTable(result);

    // Phase 2: Build Dictionary
    KylinConfig config = KylinConfig.getInstanceFromEnv();
    if (config.isSparkFactDistinctEnable()) {
        result.addTask(createFactDistinctColumnsSparkStep(jobId));
    } else {
        result.addTask(createFactDistinctColumnsStep(jobId));
    }

    if (isEnableUHCDictStep()) {
        result.addTask(createBuildUHCDictStep(jobId));
    }

    result.addTask(createBuildDictionaryStep(jobId));
    result.addTask(createSaveStatisticsStep(jobId));

    // add materialize lookup tables if needed
    LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);

    outputSide.addStepPhase2_BuildDictionary(result);

    // Phase 3: Build Cube
    addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
    outputSide.addStepPhase3_BuildCube(result);

    // Phase 4: Update Metadata & Cleanup
    result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
    inputSide.addStepPhase4_Cleanup(result);
    outputSide.addStepPhase4_Cleanup(result);

    // Set the task priority if specified
    result.setPriorityBasedOnPriorityOffset(priorityOffset);
    result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));

    return result;
}

本文从 Apache Kylin 最基本的认识到源码分析,一步步探索了 Kylin 关键部分的流程,但是本文还处于比较表层的理解,还有非常多可以深入学习的点以及算法有待进一步学习。

文章内容可能不严谨或有错误,欢迎指出。

参考来源:

《Apache Kylin 权威指南》

https://kyligence.io/zh/blog/optimize-kylin-cube-build-performance/

https://blog.csdn.net/zengrui_ops/article/details/85858860

i

发表评论

电子邮件地址不会被公开。 必填项已用*标注