跳转到主要内容

工具:将kafka消息沉入数据库表

项目描述

dbsink

从kafka主题读取并沉入数据库表,每条消息一行。

这与Kafka Connect JdbcConnector类似。这个项目门槛更低,不需要深入研究Kafka Connect生态系统。我用自定义JdbcConnector实现了这个项目,但它变得难以控制,基本上不可测试。所以,这就是我们所在的位置。

您可以选择将数据解包为avromsgpack或默认的jsonavro需要一个额外的registry参数。

Docker镜像:https://hub.docker.com/r/axiom/dbsink/builds

为什么?

我需要从定义良好的kafka主题中读取并将在数据库表中存储结果,以便协作者能够以更熟悉的方式与数据交互。

它也是一个非常方便且易于设置的PostgREST,可以在生成的表上快速获取只读REST API。

将消息映射到表

您可以使用Python类定义消息和表之间的自定义映射。您可以将自定义映射注册到dbsink.maps入口点,以便在运行时提供给dbsink

entry_points = {
    'dbsink.maps': [
        'YourCustomMap    = you.custom.map.module:CustomMapClass',
        # ...
    ]
}

自定义映射类应该继承自dbsink中的BaseMap类,并根据需要重写以下函数

  • upsert_constraint_name - 用于插入数据的约束名称。设置为None以禁用插入。在创建表上的插入约束时使用此类属性(请参阅下面的示例)。

  • unique_index_name - 基于表名的唯一索引名称。如果要在表中定义单个唯一索引,请使用此名称。

  • sequence_name - 基于表名的唯一序列名称。如果要在表中定义单个序列列,请使用此名称。

  • _check_key - 在尝试下压之前检查消息的 key 是否有效。如果有效则返回 True,否则抛出错误。

  • _check_value - 在尝试下压之前检查消息的 value 是否有效。如果有效则返回 True,否则抛出错误。

  • schema - 一个包含 SQLAlchmy ColumnIndexConstraint 架构定义的列表,用于表创建和更新。这完全描述了您的表架构。

  • message_to_values - 接受 keyvalue 参数的函数,并返回一个元组 key, dict,其中字典是传递给 SQLAlchemy 的 insert().values 方法的 values。如果指定了 avromsgpack 打包,则此函数的 value 参数已经解包。

    insert(table).values(
      # dict_returned_ends_up_here
    )
    

示例

一个简单的示例是 dbsink 包含的 StringMap 映射。

from datetime import datetime

import pytz
import sqlalchemy as sql
import simplejson as json

from dbsink.maps import BaseMap


class StringMap(BaseMap):

    @property
    def upsert_constraint_name(self):
        return None  # Ignore upserts

    def _check_key(self, key):
        return True  # All keys are valid

    def _check_value(self, value):
        # Make sure values are JSON parsable
        _ = json.loads(json.dumps(value, ignore_nan=True))
        return True

    @property
    def schema(self):
        return [
            sql.Column('id',       sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
            sql.Column('sinked',   sql.DateTime(timezone=True), index=True),
            sql.Column('key',      sql.String, default='', index=True),
            sql.Column('payload',  sql.String)
        ]

    def message_to_values(self, key, value):
        # Raises if invalid. This calls `._check_key` and `._check_value`
        self.check(key, value)

        values = {
            'sinked':  datetime.utcnow().replace(tzinfo=pytz.utc).isoformat(),
            'key':     key,
            'payload': json.dumps(value),
        }

        return key, values

高级示例

对表架构或如何将消息数据映射到架构没有限制。以下示例使用了一个 PostGIS 列。

from datetime import datetime

import pytz
import sqlalchemy as sql
import simplejson as json
from shapely.geometry import shape
from geoalchemy2.types import Geography

from dbsink.maps import BaseMap


class NamedGenericGeography(BaseMap):

    def _check_key(self, key):
        return True  # All keys are valid

    def _check_value(self, value):
        # Make sure values are JSON parsable
        _ = json.loads(json.dumps(value, ignore_nan=True))
        return True

    @property
    def schema(self):
        return [
            sql.Column('id',       sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
            sql.Column('name',     sql.String, default='', index=True),
            sql.Column('time',     sql.DateTime(timezone=True), index=True),
            sql.Column('geom',     Geography(srid=4326)),
            sql.Index(
                self.unique_index_name,
                'name',
                'time',
                unique=True,
            ),
            sql.UniqueConstraint(
                'name',
                'time',
                name=self.upsert_constraint_name
            )
        ]

    def message_to_values(self, key, value):
        """ Assumes a message format of
        {
          "time": 1000000000, # unix epoch
          "name": "my cool thing",
          "geojson": {
            "geometry": {
              "type": "Polygon",
              "coordinates": [ [ [ -118.532116484818843, 32.107425500492766 ], [ -118.457544847012443, 32.107425500492702 ], [ -118.457544847012443, 32.054517056541435 ], [ -118.532116484818872, 32.054517056541464 ], [ -118.532116484818843, 32.107425500492766 ] ] ]
            }
          }
        }
        """
        # Raises if invalid
        self.check(key, value)

        # GeoJSON `geometry` attribute to WKT
        geometry = shape(value['geojson']['geometry']).wkt

        values = {
            'name': value['name']
            'time': datetime.fromtimestamp(value['time'], pytz.utc).isoformat()
            'geom': geometry
        }

        return key, values

配置

此程序使用 Click 作为 CLI 接口。有关所有选项,请使用 help

$ dbsink --help

环境变量

所有配置选项都可以使用模式 DBSINK_[argument_name]=[value] 通过环境变量指定。有关更多信息,请参阅 click 文档

DBSINK_TOPIC="topic-to-listen-to" \
DBSINK_LOOKUP="StringMap" \
DBSINK_TABLE="MyCoolTable" \
DBSINK_CONSUMER="myconsumer" \
DBSINK_PACKING="msgpack" \
DBSINK_OFFSET="earlist" \
DBSINK_DROP="true" \
DBSINK_VERBOSE="1" \
    dbsink

测试

您可以使用 pytest 运行测试。要运行集成测试,启动一个数据库,使用以下命令: docker run -p 30300:5432 --name dbsink-int-testing-db -e POSTGRES_USER=sink -e POSTGRES_PASSWORD=sink -e POSTGRES_DB=sink -d mdillon/postgis:11,然后运行 pytest -m integration

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源代码分发

dbsink-2.6.0.tar.gz (17.1 kB 查看哈希值)

上传时间 源代码

构建分发

dbsink-2.6.0-py3-none-any.whl (15.6 kB 查看哈希值)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面