Airbyte 介绍

2022/09/03

Airbyte & Fivetran: 从 ETL 到 EL

随着云数仓(Cloud Data Warehouse,比如:Snowflake、Redshift、BigQuery等)的飞速发展,对于数据集成,数据处理等领域都带来了很多新变化。从ETL逐渐变为了ELT, 而T(Transform)如果是dbt的强项,那其它组件就可以专注于做EL,即从数据源读取数据,然后写入到云数仓中。SaaS业务系统的流行,带来了大量的非关系型数据库的数据源,这也带动了新的工具的出现。

这个领域最有名应该是Fivetran,正如很多流行的SaaS服务,都会有一些开源替代一样,这个开源数据集成领域中,Airbyte是其中最流行的一个。

Airbyte的简单流程:

  1. 从Airbyte支持的“Sources”中选择想要连接的数据源,并配置相应信息

1

  1. 从Airbyte支持的“Destinations”中选择数据写入的目标,并配置相应信息。

2

  1. 有了数据源和目标的信息,我们就可以建立一条连接(connection),并设置一个同步周期来同步数据了

3

至此这个数据同步任务算是完成配置,可以按照周期定时跑起来了。

Airbyte看似功能单一,但正如很多流行的开源软件一样,Airbyte强大的地方是:它定义了一个被广泛接受的标准,并形成了一个庞大的“连接器”(Connectors)生态。

Airbyte Connectors

我们知道,编写某一个数据源的连接器并不难,但是这个价值有限。但是当一个平台支持了大量的连接器,比如:支持200个Source类型连接器,100 个Destination类型的连接器,那么其可以带来 2万 个不同的组合,这个是非常有价值的。

而这些不同的连接器往往是不同时期、不同人来编写的(每个人的需求不同),甚至使用了不同的编程语言。比如:当连接API数据集时,Python就非常方便,而如果连接的是关系型数据库,那Java的JDBC使得能支持更多的数据库。但无论Python还是Java,其程序包依赖都很复杂和易错。那Airbyte的connector怎么解决这个问题?Airbyte规定每个connector都放在一个完整的docker镜像中,这个镜像包含了所有运行的依赖,我们甚至可以直接运行和使用这些connector的docker镜像。在Airbyte的界面上,我们可以看到其支持的一些connector信息:

4

比如:Faker这个Source类型的连接器,其作用主要是调用Faker库造些测试用的假数据。其docker镜像是: airbyte/source-faker:0.1.0

关于Airbyte的Connector需要遵守的规范,可以参考其文档: Airbyte Specification 但是估计看起来有些繁琐,那我们来通过示例来简化一下吧。

我们就来拿 Faker 这个最简单的Source类型的连接器来演示一下吧,作为Source Connector,其需要实现如下4个API (命令行工具的4个子命令):

  • spec
  • check
  • discover
  • read

spec 命令:定义配置该connector需要的参数

我们直接运行:

docker run -v /tmp:/tmp --rm airbyte/source-faker:0.1.0 spec

会得到如下的json输出:

5

通过这个json schema,我们可以看出这个插件需要两个参数:

  • count,其是必须填写的,类型是integer
  • seed, 非必须,类型也是integer

check 命令:检查配置文件是否正确

在读取了spec命令输出后,我们发现有2个配置项,其中只有count是必须的,那我们可以创建一个配置文件

{
    "count": 2
}

然后运行check命令来检查是否正确:

docker run -v /tmp:/tmp --rm airbyte/source-faker:0.1.0 check --config /tmp/config.json

得到输出:

{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

discover 命令:发现数据源中的表结构信息

docker run -v /tmp:/tmp --rm airbyte/source-faker:0.1.0 discover --config /tmp/config.json

我们可以得到该数据源的所有表的信息(这个例子只有一个,但是别的数据源可能有多个):

6

我们可以看到这个connector输出一个 Users表,并且这个连接器支持两种更新方式:全量和增量

read 命令:真正开始逐条读取数据源的数据

read命令除了之前的 –config 参数, 还需要指定一个 –catalog 来指定要读取哪些表和列,比如定义如下 catalog.json 文件:

7

我们运行 read 命令:

docker run -v /tmp:/tmp --rm airbyte/source-faker:0.1.0 read --config /tmp/config.json --catalog catalog.json

然后命令行上就会有3行的打印输出(我们配置Faker生成2条数据):

8

其中前两条是数据 (JSON中type是 RECORD),最后一条是 STATE, 其是和增量更新有关的,当记录了这个信息后,下次的增量更新就可以从这个记录继续了。

Python CDK 来减轻编写connector的负担

虽然 spec + check + discover + read的拆分很科学,但是这个也增加了编写connector的难度(要实现上面命令)。为了缓解,Airbyte对于Python提供了 Connector-Development Kit (CDK)。会辅助生成大量的代码,这样connector编写者只需要关注最核心的业务逻辑即可。

其它一些话题

Airbyte 集成了 dbt,到底哪里集成的

Airbyte的理念是主要做Extract (Source) + Load (Destination),所以,中间的逻辑要尽可能简单。当写入目标后,其实是更原始的格式,比如:对于上面示例的 “Faker -> ClickHouse”连接,在ClickHouse中创建的目标表其实是如下内容:

9

数据是较“原汁原味”的保存到了数仓中,虽然是以JSON列的形式保存的。要分析其数据,一般还需要调用类似 get_json_object 等函数来把这些JSON转为更多列。

为了简化这个步骤,对于一些常见数仓,airbyte支持“Basic Normalization”的操作,这步骤就是借助了dbt来为这些数仓做转化。

即:如果在connection中配置了最后要做 “Normalization”,那在Sync好数据后,会调用dbt来做数据转换。

Airbyte 如果单机下是借用的“Unix管道”,那Kubernetes下是怎么做的

最早airbyte只支持单机的 docker-compose 来把 source 和 destination 用管道串起来。 但是,后来当扩展到Kubernetes时,遇到了如何把数据从 Source 传递到 Destination的故事,其借助了 socat & sidecar 等技术来完成类似单机的功能。文章本身挺有意思的,当然这种实现有很多问题,Airbyte团队也在改用新的实现。

细节是魔鬼,当调度和运行大量的 Connections 时,挑战很大。

我们看到Airbyte的原理看似很简单, 只要定义好 Source 和 Destination的规范,然后似乎把Source和Destination放到一起就完事了。

但是其实上,如果要稳定成功的调度起大量的同步任务 (connections),那是非常难的。Airbyte的官方blog有篇很棒的文章做了介绍(值得一看): 《How we scale workflow orchestration with Temporal》

License修改

随着Airbyte的流行,Airbyte为了避免竞争,在去年做了License修改,

Airbyte Licensing Overview

  • Airbyte Connectors are open sourced and available under the MIT License.
  • Airbyte Protocol is open sourced and available under the MIT License.
  • Airbyte CDK (Connector Development Kit) is open sourced and available under the MIT License.
  • Airbyte Core is licensed under the Elastic License 2.0 (ELv2).
  • Airbyte Cloud & Airbyte Enterprise are both closed source and require a commercial license from Airbyte.

简单描述一下:之前代码(最后一个MIT版本是v0.29.22-alpha)都是MIT开源协议,但是后面的,大多数运行相关的代码都改为有很多限制的 Elastic License 2.0 协议了。

不过幸运的是: 最最有价值的 Connectors 以及其协议,方便编写Connector的CDK都仍然还是 MIT 协议!

Airbyte Connector & Business Intelligence

以观远的BI举例,我们支持从文件、数据库中导入数据到内置的DeltaLake中,这个其实和 Airbyte 的 Source -> Destination 是同一回事。我们是否能也支持Airbyte的connectors呢? :)

参考资料

Airbyte:一家开源“数据集成”公司

Post Directory