跳转到主要内容

Rainer的Python客户端

项目描述

Rainer

Rainer是一个基于名为“提交”的版本化键/值对的配置管理库。我们创建它是因为我们注意到我们有很多服务,每个服务都以略微不同的方式(带有略微不同的缺陷)管理配置。它由一组可以独立使用或作为强大配置管理系统一部分使用的API组成。除了键/值访问外,它还可以提供

  • 每个键的历史提交的完整审计跟踪,包括谁、为什么以及何时提交。
  • 检测并发修改并防止用户互相覆盖的能力。
  • 可扩展性以处理各种存储后端和提交有效负载类型。
  • 可选的基于ZooKeeper的视图和通知,用于即时集群级更新。
  • 可选的HTTP API以及JVM、Python和命令行客户端。

Rainer是用Scala编写的,本页的示例也是如此,但它也应适用于其他JVM语言(如Java)。

提交

Rainer基于名为“提交”的版本化键/值对。提交具有以下属性

  • 键,是一个字符串。
  • 有效负载,可以是任意类型。将其想象成一个简短的文档。
  • 版本,每次连续提交时必须增加1。
  • 作者,创建提交的实体。
  • 注释,描述提交的一些自由格式字符串。
  • Mtime,提交创建的时间戳。

每个键的提交与其他键的提交是分开的,并且独立进行版本控制。特别是,没有全局数据库版本的概念。

您的有效负载类型必须是可反序列化的,因此您需要为它提供一个KeyValueDeserialization。您也可以选择提供一个KeyValueSerialization。

如果您没有使用序列化器,那么在您创建一个新的提交时,您只需要提供一些可以使用您的KeyValueDeserialization进行反序列化的字节,而不是提供一个对象作为有效负载。这种模式对于您想允许人类直接编辑序列化的文档非常有用,因为它将保留注释、空格等。

用法

Rainer提供了三个可以单独使用或一起使用的服务器端组件

  • CommitStorage(持久化日志存储)
  • CommitKeeper(ZooKeeper支持的视图和通知)
  • RainerServlet(远程检查和修改的HTTP API)

如果您想同时使用这三个组件,最简单的方法是使用“Rainers”构建器,它会创建所有三个组件并将它们连接在一起

implicit val serialization = KeyValueSerialization.usingJackson[ValueType](objectMapper)
implicit val deserialization = KeyValueDeserialization.usingJackson[ValueType](objectMapper)

val db = new MySQLDB(dbConfig) with DbCommitStorageMySQLMixin
val rainers = Rainers.create[ValueType](
  curator,
  "/path/in/zk",
  new DbCommitStorage[ValueType](db, "table_name")
)

db.start()
rainers.start()

// Do what you will with these:
val myStorage: CommitStorage[ValueType] = rainers.storage
val myKeeper: CommitKeeper[ValueType] = rainers.keeper
val myServlet: RainerServlet[ValueType] = rainers.servlet

// For example:
myStorage.save(Commit.fromValue(
  key = "foo",
  version = 1,
  payload = new ValueType,
  author = "me",
  comment = "Creating foo",
  mtime = DateTime.now
))

组件

CommitStorage:持久化日志存储

CommitStorage特质表示一个键/值存储,它保留每个提交的每个版本。主要的内置实现是DbCommitStorage,它由一个关系型数据库管理系统支持。

implicit val deserialization = KeyValueDeserialization.usingJackson[ValueType](objectMapper)

val db = new MySQLDB(dbConfig) with DbCommitStorageMySQLMixin
val storage = new DbCommitStorage[ValueType](db, "table_name")

db.start()
storage.start()

// Revert "foo" to previous version
val foo = storage.get("foo")
val previousFoo = foo flatMap (x => storage.get(x.key, x.version - 1))
previousFoo foreach (x => storage.save(Commit.fromBytes(x.key, x.version + 2, x.payload, "me", "Reverting foo", DateTime.now))

CommitKeeper:ZooKeeper视图

CommitKeeper类表示存储在ZooKeeper中每个提交的最新版本。它支持按需访问、实时更新视图和即时通知。它不为每个键保留完整的历史记录;只有CommitStorage这样做。

implicit val deserialization = KeyValueDeserialization.usingJackson[ValueType](objectMapper)

val keeper = new CommitKeeper[ValueType](curator, "/path/in/zk")

// On-demand access (synchronous ZK operations):
val foo = keeper.get("foo")
val keys = keeper.keys

// Create a mirror (async updates from ZK):
val mirror: Var[Map[String, Commit[ValueType]]] = keeper.mirror()

// Which, for example, you can use to update an AtomicReference:
val ref = new AtomicReference[Map[String, Commit[ValueType]]]
val c = mirror.changes.register(Witness(ref))

// When you're no longer interested in updates, close the mirror:
Await.result(c.close())

使用CommitKeeper与CommitStorage结合

将CommitStorage与CommitKeeper结合使用是用于管理机器集群配置的常见模式。结合的配置可以提供在关系型数据库管理系统中的完整历史日志更新,并通过ZooKeeper提供低延迟的通知。想法是通过keeperPublishing CommitStorage(在您保存任何内容时都会更新ZooKeeper)进行所有保存,并通过CommitKeeper提供的镜像进行读取。为了防止不合适的崩溃或带外ZooKeeper修改导致ZooKeeper不同步,自动发布者可以定期检测并推送任何未推送的更新。

"Rainers"构建器为您设置所有这些,但您也可以使用类似的方法手动设置

// During setup, link your underlying storage and keeper together.
// Afterwards, use "storage", not "underlyingStorage" for your own operations.
val keeper = new CommitKeeper[ValueType](curator, "/path/in/zk")
val storage = CommitStorage.keeperPublishing(underlyingStorage, keeper)
val autoPublisher = keeper.autoPublisher(underlyingStorage, period, periodFuzz)
autoPublisher.start()

// When your application exits, shut down your autoPublisher.
Await.result(autoPublisher.close())

当使用链接的CommitStorage和CommitKeeper时,请确保只通过CommitStorage发送写入。这是因为CommitStorage被视为记录系统。

// You can get a mirror from your keeper, which will now reflect the most recent commits from your storage.
val mirror: Var[Map[String, Commit[ValueType]]] = keeper.mirror()

// Which, for example, you can use to update an AtomicReference:
val ref = new AtomicReference[Map[String, Commit[ValueType]]]
val c = mirror.changes.register(Witness(ref))

// When you save to the storage, ZooKeeper and the mirror will be updated automatically. (Don't save using the keeper!)
storage.save(someCommit)

在不使用CommitKeeper的情况下使用CommitStorage

您可以使用CommitStorage本身,唯一丢失的是ZooKeeper视图,这意味着您无法设置镜像。在这种情况下,您将需要按需访问底层存储,或者自己设置一个定期刷新的缓存。

在不使用CommitStorage的情况下使用CommitKeeper

您也可以单独使用CommitKeeper,而不需要任何后端日志存储。您将无法访问任何提交的旧版本。只需调用keeper.save(commit)来发布新提交,并不要使用自动发布者。其他CommitKeeper功能将正常工作,包括获取和镜像。

HTTP服务器

您可以使用RainerServlet获得一个不错的HTTP API来访问您的CommitStorage。它不使用或需要CommitKeeper。该API可以像这样与任何HTTP客户端一起使用

$ curl \
  -s \
  -XPOST \
  -H'Content-Type: application/octet-stream' \
  -H'X-Rainer-Author: gian' \
  -H'X-Rainer-Comment: rofl' \
  --data-binary '{"hey":"what"}' \
  http://localhost:8080/diary/foo/1
{"author":"gian","comment":"rofl","key":"foo","mtime":"2014-02-11T14:01:28.839-08:00","version":1}

$ curl -s http://localhost:8080/diary/foo
{"hey":"what"}

$ curl -s http://localhost:8080/diary/foo/1
{"hey":"what"}

$ curl -s http://localhost:8080/diary/foo/1/meta
{"author":"gian","key":"foo","version":1,"mtime":"2014-02-11T14:01:28.839-08:00","comment":"rofl"}

客户端

如果您有一个RainerServlet正在运行,有一些预构建的客户端可以用于远程检查和修改配置。

JVM HTTP客户端

您可以使用HttpCommitStorage来访问运行在其他机器上的RainerServlet

val uri = new URI("http://localhost:8080/diary")
val client = ClientBuilder()
  .name(uri.toString)
  .codec(Http())
  .group(Group.fromVarAddr(InetResolver.bind(uri.getAuthority)))
  .hostConnectionLimit(2)
  .build()
val storage = new HttpCommitStorage[ValueType](client, uri)

// Use it like any other CommitStorage:
val commit = storage.get("hey")

Python HTTP客户端

从Python,您可以使用"pyrainer"。您可以使用pip install pyrainer安装它,并像这样使用它

import pyrainer.http

client = pyrainer.http.RainerClient("http://localhost:8080/diary")

commits = client.list_full()
for key in sorted(commits.keys()):
  version = commits[key].version
  print "%s [current version = %s]" % (key, str(version))

hey = client.get_commit("hey")
print "Commit version %s = %s" % (hey.meta["version"], hey.value)

client.post_commit({"key": "hey", "version": 2, "author": "sue", "comment": "rofl"}, "new value")

命令行客户端

如果您已安装pyrainer(pip install pyrainer),则可以使用交互式命令行工具或在shell脚本中使用。

$ python -m pyrainer.rainer --url http://localhost:8080/diary list
foo     1       http://localhost:8080/diary/foo/1
hey     6       http://localhost:8080/diary/hey/6

$ python -m pyrainer.rainer --url http://localhost:8080/diary show hey
{"hey":"whatsit"}

$ python -m pyrainer.rainer --url http://localhost:8080/diary edit hey
[...a wild $EDITOR appears!]

您还可以为您的服务创建一个包装器,以简化调用。以下类型的可执行脚本应该可以做到这一点

#!/bin/sh -e
exec python -m pyrainer.rainer --url "http://example.com/foo" "$@"

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码分发

pyrainer-0.25.tar.gz (14.0 kB 查看哈希值)

上传时间 源代码

构建分发

pyrainer-0.25-py3-none-any.whl (11.2 kB 查看哈希值)

上传时间 Python 3

由以下支持