工具:将kafka消息沉入数据库表
项目描述
dbsink
从kafka主题读取并沉入数据库表,每条消息一行。
这与Kafka Connect JdbcConnector类似。这个项目门槛更低,不需要深入研究Kafka Connect生态系统。我用自定义JdbcConnector实现了这个项目,但它变得难以控制,基本上不可测试。所以,这就是我们所在的位置。
您可以选择将数据解包为avro
、msgpack
或默认的json
。avro
需要一个额外的registry
参数。
Docker镜像:https://hub.docker.com/r/axiom/dbsink/builds
为什么?
我需要从定义良好的kafka主题中读取并将在数据库表中存储结果,以便协作者能够以更熟悉的方式与数据交互。
它也是一个非常方便且易于设置的PostgREST,可以在生成的表上快速获取只读REST API。
将消息映射到表
您可以使用Python类定义消息和表之间的自定义映射。您可以将自定义映射注册到dbsink.maps
入口点,以便在运行时提供给dbsink
。
entry_points = {
'dbsink.maps': [
'YourCustomMap = you.custom.map.module:CustomMapClass',
# ...
]
}
自定义映射类应该继承自dbsink
中的BaseMap
类,并根据需要重写以下函数
-
upsert_constraint_name
- 用于插入数据的约束名称。设置为None
以禁用插入。在创建表上的插入约束时使用此类属性(请参阅下面的示例)。 -
unique_index_name
- 基于表名的唯一索引名称。如果要在表中定义单个唯一索引,请使用此名称。 -
sequence_name
- 基于表名的唯一序列名称。如果要在表中定义单个序列列,请使用此名称。 -
_check_key
- 在尝试下压之前检查消息的key
是否有效。如果有效则返回True
,否则抛出错误。 -
_check_value
- 在尝试下压之前检查消息的value
是否有效。如果有效则返回True
,否则抛出错误。 -
schema
- 一个包含 SQLAlchmy Column、Index 和 Constraint 架构定义的列表,用于表创建和更新。这完全描述了您的表架构。 -
message_to_values
- 接受key
和value
参数的函数,并返回一个元组key, dict
,其中字典是传递给 SQLAlchemy 的insert().values
方法的values
。如果指定了avro
或msgpack
打包,则此函数的value
参数已经解包。insert(table).values( # dict_returned_ends_up_here )
示例
一个简单的示例是 dbsink
包含的 StringMap
映射。
from datetime import datetime
import pytz
import sqlalchemy as sql
import simplejson as json
from dbsink.maps import BaseMap
class StringMap(BaseMap):
@property
def upsert_constraint_name(self):
return None # Ignore upserts
def _check_key(self, key):
return True # All keys are valid
def _check_value(self, value):
# Make sure values are JSON parsable
_ = json.loads(json.dumps(value, ignore_nan=True))
return True
@property
def schema(self):
return [
sql.Column('id', sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
sql.Column('sinked', sql.DateTime(timezone=True), index=True),
sql.Column('key', sql.String, default='', index=True),
sql.Column('payload', sql.String)
]
def message_to_values(self, key, value):
# Raises if invalid. This calls `._check_key` and `._check_value`
self.check(key, value)
values = {
'sinked': datetime.utcnow().replace(tzinfo=pytz.utc).isoformat(),
'key': key,
'payload': json.dumps(value),
}
return key, values
高级示例
对表架构或如何将消息数据映射到架构没有限制。以下示例使用了一个 PostGIS
列。
from datetime import datetime
import pytz
import sqlalchemy as sql
import simplejson as json
from shapely.geometry import shape
from geoalchemy2.types import Geography
from dbsink.maps import BaseMap
class NamedGenericGeography(BaseMap):
def _check_key(self, key):
return True # All keys are valid
def _check_value(self, value):
# Make sure values are JSON parsable
_ = json.loads(json.dumps(value, ignore_nan=True))
return True
@property
def schema(self):
return [
sql.Column('id', sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
sql.Column('name', sql.String, default='', index=True),
sql.Column('time', sql.DateTime(timezone=True), index=True),
sql.Column('geom', Geography(srid=4326)),
sql.Index(
self.unique_index_name,
'name',
'time',
unique=True,
),
sql.UniqueConstraint(
'name',
'time',
name=self.upsert_constraint_name
)
]
def message_to_values(self, key, value):
""" Assumes a message format of
{
"time": 1000000000, # unix epoch
"name": "my cool thing",
"geojson": {
"geometry": {
"type": "Polygon",
"coordinates": [ [ [ -118.532116484818843, 32.107425500492766 ], [ -118.457544847012443, 32.107425500492702 ], [ -118.457544847012443, 32.054517056541435 ], [ -118.532116484818872, 32.054517056541464 ], [ -118.532116484818843, 32.107425500492766 ] ] ]
}
}
}
"""
# Raises if invalid
self.check(key, value)
# GeoJSON `geometry` attribute to WKT
geometry = shape(value['geojson']['geometry']).wkt
values = {
'name': value['name']
'time': datetime.fromtimestamp(value['time'], pytz.utc).isoformat()
'geom': geometry
}
return key, values
配置
此程序使用 Click
作为 CLI 接口。有关所有选项,请使用 help
。
$ dbsink --help
环境变量
所有配置选项都可以使用模式 DBSINK_[argument_name]=[value]
通过环境变量指定。有关更多信息,请参阅 click 文档。
DBSINK_TOPIC="topic-to-listen-to" \
DBSINK_LOOKUP="StringMap" \
DBSINK_TABLE="MyCoolTable" \
DBSINK_CONSUMER="myconsumer" \
DBSINK_PACKING="msgpack" \
DBSINK_OFFSET="earlist" \
DBSINK_DROP="true" \
DBSINK_VERBOSE="1" \
dbsink
测试
您可以使用 pytest
运行测试。要运行集成测试,启动一个数据库,使用以下命令: docker run -p 30300:5432 --name dbsink-int-testing-db -e POSTGRES_USER=sink -e POSTGRES_PASSWORD=sink -e POSTGRES_DB=sink -d mdillon/postgis:11
,然后运行 pytest -m integration
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。