跳转到主要内容

Redis for Humans.

项目描述

Pottery: Redis for Humans 🌎🌍🌏

Redis 很棒,但 Redis命令 并非总是直观。Pottery 是一种访问Redis的Pythonic方法。如果您知道如何使用Python字典,那么您就已经知道如何使用Pottery。Pottery 可以让您更轻松地访问Redis,并用于实现微服务弹性模式;它已在规模生产中得到实战检验。

Build status Security status Latest released version

Supported Python versions Number of lines of code

Total number of downloads Downloads per month Downloads per week

目录

安装

$ pip3 install pottery

使用

首先,设置您的Redis客户端

>>> from redis import Redis
>>> redis = Redis.from_url('redis://localhost:6379/1')
>>>

字典 📖

RedisDict 是一个与Python的 dict 兼容的Redis后端容器。

以下是一个使用 RedisDict 的小例子

>>> from pottery import RedisDict
>>> tel = RedisDict({'jack': 4098, 'sape': 4139}, redis=redis, key='tel')
>>> tel['guido'] = 4127
>>> tel
RedisDict{'jack': 4098, 'sape': 4139, 'guido': 4127}
>>> tel['jack']
4098
>>> del tel['sape']
>>> tel['irv'] = 4127
>>> tel
RedisDict{'jack': 4098, 'guido': 4127, 'irv': 4127}
>>> list(tel)
['jack', 'guido', 'irv']
>>> sorted(tel)
['guido', 'irv', 'jack']
>>> 'guido' in tel
True
>>> 'jack' not in tel
False
>>>

注意 RedisDict() 的前两个关键字参数:第一个是您的Redis客户端。第二个是您的字典的Redis键名。除此之外,您可以使用 RedisDict 与任何其他Python dict 一样。

限制

  1. 键和值必须是JSON可序列化的。

集合 🛍️

RedisSet 是一个与Python的 set 兼容的Redis后端容器。

以下是一个简要的演示

>>> from pottery import RedisSet
>>> basket = RedisSet({'apple', 'orange', 'apple', 'pear', 'orange', 'banana'}, redis=redis, key='basket')
>>> sorted(basket)
['apple', 'banana', 'orange', 'pear']
>>> 'orange' in basket
True
>>> 'crabgrass' in basket
False

>>> a = RedisSet('abracadabra', redis=redis, key='magic')
>>> b = set('alacazam')
>>> sorted(a)
['a', 'b', 'c', 'd', 'r']
>>> sorted(a - b)
['b', 'd', 'r']
>>> sorted(a | b)
['a', 'b', 'c', 'd', 'l', 'm', 'r', 'z']
>>> sorted(a & b)
['a', 'c']
>>> sorted(a ^ b)
['b', 'd', 'l', 'm', 'r', 'z']
>>>

注意 RedisSet() 的两个关键字参数:第一个是您的Redis客户端。第二个是您集合的Redis键名。除此之外,您可以使用您的 RedisSet 与其他任何Python set 一样。

使用 .contains_many() 进行更高效的多个元素成员资格测试

>>> nirvana = RedisSet({'kurt', 'krist', 'dave'}, redis=redis, key='nirvana')
>>> tuple(nirvana.contains_many('kurt', 'krist', 'chat', 'dave'))
(True, True, False, True)
>>>

限制

  1. 元素必须是JSON可序列化的。

列表 ⛓

RedisList 是一个与Python的 list 兼容的Redis后端容器。

>>> from pottery import RedisList
>>> squares = RedisList([1, 4, 9, 16, 25], redis=redis, key='squares')
>>> squares
RedisList[1, 4, 9, 16, 25]
>>> squares[0]
1
>>> squares[-1]
25
>>> squares[-3:]
[9, 16, 25]
>>> squares[:]
[1, 4, 9, 16, 25]
>>> squares + [36, 49, 64, 81, 100]
RedisList[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
>>>

注意 RedisList() 的两个关键字参数:第一个是您的Redis客户端。第二个是您列表的Redis键名。除此之外,您可以使用您的 RedisList 与其他任何Python list 一样。

限制

  1. 元素必须是JSON可序列化的。
  2. 在底层,Python使用数组实现 list,Redis使用双链表实现列表。因此,在 RedisList 的头部或尾部插入元素是快速的,O(1)。然而,通过索引访问 RedisList 元素是慢的,O(n)。因此,在性能和理想用例方面,RedisList 与Python的 deque 更相似,而不是Python的 list。与其使用 RedisList,不如考虑使用 RedisDeque

计数器 🧮

RedisCounter 是一个与Python的 collections.Counter 兼容的Redis后端容器。

>>> from pottery import RedisCounter
>>> c = RedisCounter(redis=redis, key='my-counter')
>>> c = RedisCounter('gallahad', redis=redis, key='my-counter')
>>> c.clear()
>>> c = RedisCounter({'red': 4, 'blue': 2}, redis=redis, key='my-counter')
>>> c.clear()
>>> c = RedisCounter(redis=redis, key='my-counter', cats=4, dogs=8)
>>> c.clear()

>>> c = RedisCounter(['eggs', 'ham'], redis=redis, key='my-counter')
>>> c['bacon']
0
>>> c['sausage'] = 0
>>> del c['sausage']
>>> c.clear()

>>> c = RedisCounter(redis=redis, key='my-counter', a=4, b=2, c=0, d=-2)
>>> sorted(c.elements())
['a', 'a', 'a', 'a', 'b', 'b']
>>> c.clear()

>>> RedisCounter('abracadabra', redis=redis, key='my-counter').most_common(3)
[('a', 5), ('b', 2), ('r', 2)]
>>> c.clear()

>>> c = RedisCounter(redis=redis, key='my-counter', a=4, b=2, c=0, d=-2)
>>> from collections import Counter
>>> d = Counter(a=1, b=2, c=3, d=4)
>>> c.subtract(d)
>>> c
RedisCounter{'a': 3, 'b': 0, 'c': -3, 'd': -6}
>>>

注意 RedisCounter() 的前两个关键字参数:第一个是您的Redis客户端。第二个是您计数器的Redis键名。除此之外,您可以使用您的 RedisCounter 与其他任何Python Counter 一样。

限制

  1. 键必须是JSON可序列化的。

双端队列 🖇️

RedisDeque 是一个与Python的 collections.deque 兼容的Redis后端容器。

示例

>>> from pottery import RedisDeque
>>> d = RedisDeque('ghi', redis=redis, key='letters')
>>> for elem in d:
...     print(elem.upper())
G
H
I

>>> d.append('j')
>>> d.appendleft('f')
>>> d
RedisDeque(['f', 'g', 'h', 'i', 'j'])

>>> d.pop()
'j'
>>> d.popleft()
'f'
>>> list(d)
['g', 'h', 'i']
>>> d[0]
'g'
>>> d[-1]
'i'

>>> list(reversed(d))
['i', 'h', 'g']
>>> 'h' in d
True
>>> d.extend('jkl')
>>> d
RedisDeque(['g', 'h', 'i', 'j', 'k', 'l'])
>>> d.rotate(1)
>>> d
RedisDeque(['l', 'g', 'h', 'i', 'j', 'k'])
>>> d.rotate(-1)
>>> d
RedisDeque(['g', 'h', 'i', 'j', 'k', 'l'])

>>> RedisDeque(reversed(d), redis=redis)
RedisDeque(['l', 'k', 'j', 'i', 'h', 'g'])
>>> d.clear()

>>> d.extendleft('abc')
>>> d
RedisDeque(['c', 'b', 'a'])
>>>

注意 RedisDeque() 的两个关键字参数:第一个是您的Redis客户端。第二个是您双端队列的Redis键名。除此之外,您可以使用您的 RedisDeque 与其他任何Python deque 一样。

限制

  1. 元素必须是JSON可序列化的。

队列 🚶‍♂️🚶‍♀️🚶‍♂️

RedisSimpleQueue 是一个与Python的 queue.SimpleQueue 兼容的Redis后端多生产者、多消费者FIFO队列。通常,如果您在一个或多个线程中使用它,请使用Python queue.Queue;如果您在进程间使用它,请使用 multiprocessing.Queue;如果您需要跨机器共享它,或者如果需要您的队列在应用程序崩溃或重启后持久化,请使用 RedisSimpleQueue

实例化一个 RedisSimpleQueue

>>> from pottery import RedisSimpleQueue
>>> cars = RedisSimpleQueue(redis=redis, key='cars')
>>>

注意 RedisSimpleQueue() 的两个关键字参数:第一个是您的Redis客户端。第二个是您队列的Redis键名。除此之外,您可以使用您的 RedisSimpleQueue 与其他任何Python queue.SimpleQueue 一样。

检查队列状态,向队列中放入一些项目,并取出这些项目

>>> cars.empty()
True
>>> cars.qsize()
0
>>> cars.put('Jeep')
>>> cars.put('Honda')
>>> cars.put('Audi')
>>> cars.empty()
False
>>> cars.qsize()
3
>>> cars.get()
'Jeep'
>>> cars.get()
'Honda'
>>> cars.get()
'Audi'
>>> cars.empty()
True
>>> cars.qsize()
0
>>>

限制

  1. 项目必须是JSON可序列化的。

Redlock 🔒

Redlock 是一个安全且可靠的锁,用于协调跨线程、进程甚至机器共享资源的访问,没有单点故障。有关理由和算法描述。

Redlock尽可能地实现了Python的出色threading.Lock API。换句话说,你可以像使用threading.Lock一样使用Redlock。使用Redlock而不是threading.Lock的主要原因是Redlock可以协调跨不同机器共享资源的访问;threading.Lock则不能。

创建一个Redlock

>>> from pottery import Redlock
>>> printer_lock = Redlock(key='printer', masters={redis})
>>>

key参数表示资源,masters参数指定了要分布锁的Redis主节点。在生产环境中,你应该有5个Redis主节点。这是为了消除单点故障——即使丢失了5个Redis主节点中的2个,你的Redlock仍然可用且性能良好。现在你可以保护对资源的访问了

>>> if printer_lock.acquire():
...     print('printer_lock is locked')
...     # Critical section - print stuff here.
...     printer_lock.release()
printer_lock is locked
>>> bool(printer_lock.locked())
False
>>>

或者你可以在上下文管理器内保护对资源的访问

>>> with printer_lock:
...     print('printer_lock is locked')
...     # Critical section - print stuff here.
printer_lock is locked
>>> bool(printer_lock.locked())
False
>>>

每次需要保护资源时创建一个新的Redlock对象,并且不要在不同部分的代码中共享Redlock实例是最安全的。换句话说,将key视为标识资源;不要将任何特定的Redlock视为标识资源。每次需要锁时创建一个新的Redlock可以避免由于将如何使用Redlock与应用程序/服务的forking/threading模型解耦而产生的错误。

Redlock会自动释放(默认情况下,在10秒后)。你应该确保关键部分在超时内完成。自动释放Redlock的原因是保持“活性”并避免死锁(如果在释放锁之前进程在关键部分中死亡)。

>>> import time
>>> if printer_lock.acquire():
...     # Critical section - print stuff here.
...     time.sleep(10)
>>> bool(printer_lock.locked())
False
>>>

如果10秒不足以完成执行关键部分,则可以指定自己的自动释放时间(以秒为单位)

>>> printer_lock = Redlock(key='printer', masters={redis}, auto_release_time=15)
>>> if printer_lock.acquire():
...     # Critical section - print stuff here.
...     time.sleep(10)
>>> bool(printer_lock.locked())
True
>>> time.sleep(5)
>>> bool(printer_lock.locked())
False
>>>

默认情况下,.acquire()会无限期地阻塞,直到获取锁。你可以通过指定blocking参数使.acquire()立即返回。如果获取了锁,则.acquire()返回True;如果没有获取,则返回False

>>> printer_lock_1 = Redlock(key='printer', masters={redis})
>>> printer_lock_2 = Redlock(key='printer', masters={redis})
>>> printer_lock_1.acquire(blocking=False)
True
>>> printer_lock_2.acquire(blocking=False)  # Returns immediately.
False
>>> printer_lock_1.release()
>>>

你可以通过指定timeout参数(以秒为单位)使.acquire()阻塞,但不无限期

>>> printer_lock_1.acquire(timeout=1)
True
>>> printer_lock_2.acquire(timeout=1)  # Waits 1 second.
False
>>> printer_lock_1.release()
>>>

你可以在初始化Redlock上下文管理器时配置其阻塞/超时行为。如果上下文管理器无法获取锁,则抛出QuorumNotAchieved异常。

>>> import contextlib
>>> from pottery import QuorumNotAchieved
>>> printer_lock_1 = Redlock(key='printer', masters={redis}, context_manager_blocking=True, context_manager_timeout=0.2)
>>> printer_lock_2 = Redlock(key='printer', masters={redis}, context_manager_blocking=True, context_manager_timeout=0.2)
>>> with printer_lock_1:
...     with contextlib.suppress(QuorumNotAchieved):
...         with printer_lock_2:  # Waits 0.2 seconds; raises QuorumNotAchieved.
...             pass
...     print(f"printer_lock_1 is {'locked' if printer_lock_1.locked() else 'unlocked'}")
...     print(f"printer_lock_2 is {'locked' if printer_lock_2.locked() else 'unlocked'}")
printer_lock_1 is locked
printer_lock_2 is unlocked
>>>

同步() 👯‍♀️

synchronize()是一个装饰器,它允许一次只有一个线程执行一个函数。在底层,synchronize()使用Redlock,因此请参阅Redlock文档以获取更多信息。

这是如何使用synchronize()的方法

>>> from pottery import synchronize
>>> @synchronize(key='synchronized-func', masters={redis}, auto_release_time=.5, blocking=True, timeout=-1)
... def func():
...   # Only one thread can execute this function at a time.
...   return True
...
>>>

NextId 🔢

NextId可以安全可靠地在线程、进程甚至机器之间生成递增的ID,没有单点故障。请参阅动机和算法描述

实例化一个ID生成器

>>> from pottery import NextId
>>> tweet_ids = NextId(key='tweet-ids', masters={redis})
>>>

key参数表示序列(因此你可以为用户ID、评论ID等拥有不同的序列),masters参数指定了要分布ID生成的Redis主节点(在生产环境中,你应该有5个Redis主节点)。现在,每次需要用户ID时,都可以在ID生成器上调用next()

>>> next(tweet_ids)
1
>>> next(tweet_ids)
2
>>> next(tweet_ids)
3
>>>

有两个注意事项

  1. 如果许多客户端正在并发生成ID,那么ID序列可能会有“空洞”(例如:1,2,6,10,11,21,……)。
  2. 此算法可以扩展到每秒约5,000个ID(使用5个Redis主节点)。如果你需要比这更快的ID,那么你可能需要考虑其他技术。

redis_cache()

redis_cache() 是一个简单的轻量级无界函数返回值缓存,有时也称为 “memoize”redis_cache() 尽可能地实现了 Python 的优秀 functools.cache() API。换句话说,你可以像使用 functools.cache() 一样使用 redis_cache()

限制

  1. 函数的参数必须是可哈希的。
  2. 函数的返回值必须是可序列化为 JSON 的。
  3. functools.cache() 一样,redis_cache() 不允许最大大小,也不会驱逐旧值,它会无界增长。仅在以下情况下使用 redis_cache()
    1. 你的函数参数空间具有已知的小基数。
    2. 当调用 redis_cache() 来装饰你的函数时,指定一个 timeout,以便在最后一次缓存访问(命中或未命中)后的 timeout 秒钟删除你的 整个 返回值缓存。
    3. 你定期调用 .cache_clear() 来删除你的 整个 返回值缓存。
    4. 你接受返回值缓存无界增长,并且你 了解这对你的底层 Redis 实例的影响

一般来说,你应该只在想要重用之前计算过的值时使用 redis_cache()。因此,没有必要缓存具有副作用或不纯函数(如 time()random())的函数。

装饰一个函数

>>> import time
>>> from pottery import redis_cache
>>> @redis_cache(redis=redis, key='expensive-function-cache')
... def expensive_function(n):
...     time.sleep(1)  # Simulate an expensive computation or database lookup.
...     return n
...
>>>

注意 redis_cache() 的两个关键字参数:第一个是你的 Redis 客户端。第二个是你函数返回值缓存的 Redis 键名。

调用你的函数并观察缓存命中/未命中率

>>> expensive_function(5)
5
>>> expensive_function.cache_info()
CacheInfo(hits=0, misses=1, maxsize=None, currsize=1)
>>> expensive_function(5)
5
>>> expensive_function.cache_info()
CacheInfo(hits=1, misses=1, maxsize=None, currsize=1)
>>> expensive_function(6)
6
>>> expensive_function.cache_info()
CacheInfo(hits=1, misses=2, maxsize=None, currsize=2)
>>>

注意,第一次调用 expensive_function() 需要 1 秒并导致缓存未命中;但第二次调用几乎立即返回并导致缓存命中。这是因为第一次调用后,redis_cache() 缓存了当 n == 5 时调用的返回值。

你可以通过 expensive_function.__wrapped__ 访问原始未装饰的底层 expensive_function()。这对于内省、绕过缓存或使用不同的缓存重新包装原始函数很有用。

你可以使用 expensive_function.__bypass__(*args, **kwargs) 强制为特定的 args/kwargs 组合重置缓存。调用 expensive_function.__bypass__(*args, **kwargs) 绕过缓存查找,调用原始底层函数,然后为将来的 expensive_function(*args, **kwargs) 调用缓存结果。请注意,调用 expensive_function.__bypass__(*args, **kwargs) 既不产生缓存命中也不产生缓存未命中。

最后,使用 expensive_function.cache_clear() 清除/使你的函数的整个返回值缓存无效。

>>> expensive_function.cache_info()
CacheInfo(hits=1, misses=2, maxsize=None, currsize=2)
>>> expensive_function.cache_clear()
>>> expensive_function.cache_info()
CacheInfo(hits=0, misses=0, maxsize=None, currsize=0)
>>>

CachedOrderedDict

通过一个示例用例来解释 CachedOrderedDict 是最好的方式。想象一下,你的搜索引擎返回文档 ID,然后你必须通过数据库将其转换为完整的文档以返回给客户端。用于表示此类搜索结果的数据结构必须具有以下属性

  1. 它必须保留搜索引擎返回的文档 ID 的顺序。
  2. 它必须将文档 ID 映射到填充的文档。
  3. 它必须缓存之前填充的文档。

属性 1 和 2 由 Python 的 collections.OrderedDict 满足。然而,CachedOrderedDict 扩展了 Python 的 OrderedDict 以满足属性 3。

CachedOrderedDict 最常见的使用模式如下

  1. 使用你必须在初始化器中传递给 dict_keys 参数的 IDs 实例化 CachedOrderedDict
  2. 计算并存储缓存未命中以供未来查找。
  3. 将你的 CachedOrderedDict 的某些表示返回给客户端。

实例化一个 CachedOrderedDict

>>> from pottery import CachedOrderedDict
>>> search_results_1 = CachedOrderedDict(
...     redis_client=redis,
...     redis_key='search-results',
...     dict_keys=(1, 2, 3, 4, 5),
... )
>>>

初始化参数中的 redis_client 是您的 Redis 客户端,而 redis_key 参数是 Redis 哈希表背后的 Redis 键。dict_keys 参数表示要查找并自动填充到您的 CachedOrderedDict(在缓存命中时)的有序可迭代键,或者您将不得不为未来的查找(在缓存未命中时)计算和填充的键。无论键是缓存命中还是未命中,CachedOrderedDict 都会保留 dict_keys 的顺序(类似于列表),将这些键映射到值(类似于字典),并维护一个用于未来键查找的底层缓存。

一开始,缓存是空的,所以让我们填充它

>>> sorted(search_results_1.misses())
[1, 2, 3, 4, 5]
>>> search_results_1[1] = 'one'
>>> search_results_1[2] = 'two'
>>> search_results_1[3] = 'three'
>>> search_results_1[4] = 'four'
>>> search_results_1[5] = 'five'
>>> sorted(search_results_1.misses())
[]
>>>

注意,CachedOrderedDict 保留了 dict_keys 的顺序

>>> for key, value in search_results_1.items():
...     print(f'{key}: {value}')
1: one
2: two
3: three
4: four
5: five
>>>

现在,让我们看看缓存命中和未命中的组合

>>> search_results_2 = CachedOrderedDict(
...     redis_client=redis,
...     redis_key='search-results',
...     dict_keys=(2, 4, 6, 8, 10),
... )
>>> sorted(search_results_2.misses())
[6, 8, 10]
>>> search_results_2[2]
'two'
>>> search_results_2[6] = 'six'
>>> search_results_2[8] = 'eight'
>>> search_results_2[10] = 'ten'
>>> sorted(search_results_2.misses())
[]
>>> for key, value in search_results_2.items():
...     print(f'{key}: {value}')
2: two
4: four
6: six
8: eight
10: ten
>>>

限制

  1. 键和值必须是JSON可序列化的。

布隆过滤器 🌸

布隆过滤器是一种强大的数据结构,可以帮助您回答问题:“我之前是否见过这个元素?”和“我见过多少个不同的元素?”但无法回答问题:“我之前都见过哪些元素?”所以将布隆过滤器视为可以添加元素、用于测试元素成员资格并获取长度的 Python 集合;但是您无法遍历或取出元素。

布隆过滤器是概率性的,这意味着它们有时可能会产生假阳性(例如,它们可能会报告您之前见过一个特定的元素,尽管您没有)。但它们永远不会产生假阴性(所以每次它们报告您之前没有见过一个特定的元素时,您确实从未见过它)。您可以通过调整您可接受的假阳性概率来调整,但这将以布隆过滤器的存储大小和元素插入/查找时间为代价。

创建一个 BloomFilter

>>> from pottery import BloomFilter
>>> dilberts = BloomFilter(
...     num_elements=100,
...     false_positives=0.01,
...     redis=redis,
...     key='dilberts',
... )
>>>

在这里,num_elements 代表您预计将插入到 BloomFilter 中的元素数量,而 false_positives 代表您可接受的假阳性概率。使用这两个参数,BloomFilter 会自动计算其存储大小和元素插入/查找时运行哈希函数的次数,以确保在您将插入指定数量的元素的情况下,可以保证假阳性率在您可容忍的范围内。

将元素插入到 BloomFilter

>>> dilberts.add('rajiv')
>>>

测试 BloomFilter 中的成员资格

>>> 'rajiv' in dilberts
True
>>> 'raj' in dilberts
False
>>> 'dan' in dilberts
False
>>>

看看我们已插入到 BloomFilter 中的元素数量

>>> len(dilberts)
1
>>>

请注意,BloomFilter.__len__() 是一个近似值,而不是精确值,尽管它相当准确。

将多个元素插入到 BloomFilter

>>> dilberts.update({'raj', 'dan'})
>>>

使用 .contains_many() 进行更高效的多个元素成员资格测试

>>> tuple(dilberts.contains_many('rajiv', 'raj', 'dan', 'luis'))
(True, True, True, False)
>>>

BloomFilter 中移除所有元素

>>> dilberts.clear()
>>> len(dilberts)
0
>>>

限制

  1. 元素必须是JSON可序列化的。
  2. len(bf) 是概率性的,它是一个准确的近似值。您可以通过调整 .__init__() 方法中的 num_elementsfalse_positives 参数来调整您想要的准确性,但这将以存储空间和插入/查找时间为代价。
  3. 对布隆过滤器的成员资格测试是概率性的,因为它可能会返回假阳性,但永远不会返回假阴性。这意味着如果 element in bf 评估为 True,那么您可能已将元素插入到布隆过滤器中。但如果 element in bf 评估为 False,那么您绝对没有插入该元素。再次强调,您可以通过调整 .__init__() 方法中的 num_elementsfalse_positives 参数来调整准确性,但这将以存储空间和插入/查找时间为代价。

HyperLogLogs 🪵

HyperLogLogs 是一种有趣的数据结构,旨在回答问题:“我见过多少个不同的元素?”但它无法回答问题:“我之前是否见过这个元素?”或“我之前都见过哪些元素?”所以将 HyperLogLogs 视为可以添加元素并获取长度的 Python 集合;但是您无法使用它来测试元素成员资格、遍历或取出元素。

HyperLogLogs(超日志日志)是概率性的,这意味着它们在误差范围内(最多2%)是准确的。然而,它们可以使用极小的存储空间(1.5 KB)合理准确地估计大规模数据集(如每天唯一的谷歌搜索数量)的大小。

创建一个HyperLogLog

>>> from pottery import HyperLogLog
>>> google_searches = HyperLogLog(redis=redis, key='google-searches')
>>>

将一个元素插入到HyperLogLog

>>> google_searches.add('sonic the hedgehog video game')
>>>

查看我们已插入到HyperLogLog中的元素数量

>>> len(google_searches)
1
>>>

将多个元素插入到HyperLogLog

>>> google_searches.update({
...     'google in 1998',
...     'minesweeper',
...     'joey tribbiani',
...     'wizard of oz',
...     'rgb to hex',
...     'pac-man',
...     'breathing exercise',
...     'do a barrel roll',
...     'snake',
... })
>>> len(google_searches)
10
>>>

通过一个巧妙的技巧,我们可以对HyperLogLog进行成员资格测试,尽管它从未为此目的而设计。该技巧的工作方式是创建HyperLogLog的临时副本,然后将在成员资格测试中运行的元素插入到临时副本中。如果插入更改了临时HyperLogLog的基数,则该元素肯定没有插入到原始HyperLogLog中。

>>> 'joey tribbiani' in google_searches
True
>>> 'jennifer aniston' in google_searches
False
>>>

使用 .contains_many() 进行更高效的多个元素成员资格测试

>>> tuple(google_searches.contains_many('joey tribbiani', 'jennifer aniston'))
(True, False)
>>>

HyperLogLog中删除所有元素

>>> google_searches.clear()
>>> len(google_searches)
0
>>>

限制

  1. 元素必须是JSON可序列化的。
  2. len(hll)是概率性的,因为它是一个准确的近似值。
  3. 对HyperLogLog进行成员资格测试是概率性的,因为它可能会返回假阳性,但永远不会返回假阴性。这意味着如果element in hll评估为True,那么你可能已经将元素插入到HyperLogLog中。但如果你评估element in hllFalse,那么你肯定没有插入它。

上下文计时器 ⏱️

ContextTimer可以帮助你轻松准确地测量经过的时间。请注意,ContextTimer测量的是墙时间(现实世界时间),而不是CPU时间;elapsed()返回的时间以毫秒为单位。

你可以单独使用ContextTimer...

>>> import time
>>> from pottery import ContextTimer
>>> timer = ContextTimer()
>>> timer.start()
>>> time.sleep(0.1)
>>> 100 <= timer.elapsed() < 200
True
>>> timer.stop()
>>> time.sleep(0.1)
>>> 100 <= timer.elapsed() < 200
True
>>>

…或者作为上下文管理器

>>> tests = []
>>> with ContextTimer() as timer:
...     time.sleep(0.1)
...     tests.append(100 <= timer.elapsed() < 200)
>>> time.sleep(0.1)
>>> tests.append(100 <= timer.elapsed() < 200)
>>> tests
[True, True]
>>>

贡献

获取源代码

  1. 克隆git仓库
    1. $ git clone git@github.com:brainix/pottery.git
    2. $ cd pottery/
  2. 安装项目级依赖项
    1. $ make install

运行测试

  1. 在一个终端会话中
    1. $ cd pottery/
    2. $ redis-server
  2. 在第二个终端会话中
    1. $ cd pottery/
    2. $ make test
    3. $ make test-readme

make test运行所有单元测试以及覆盖率测试。然而,有时在调试时运行单个测试模块、类或方法可能很有用。

  1. 在一个终端会话中
    1. $ cd pottery/
    2. $ redis-server
  2. 在第二个终端会话中
    1. 使用$ make test tests=tests.test_dict运行测试模块
    2. 使用:$ make test tests=tests.test_dict.DictTests运行测试类
    3. 使用:$ make test tests=tests.test_dict.DictTests.test_keyexistserror运行测试方法

make test-readme对README中的Python代码示例进行doctests,以确保其正确性。

项目详情


发布历史 发布通知 | RSS源

下载文件

下载适合您平台的文件。如果您不确定该选择哪个,请了解有关安装包的更多信息。

源代码分发

pottery-3.0.0.tar.gz (63.4 kB 查看哈希值)

上传时间 源代码

构建分发

pottery-3.0.0-py3-none-any.whl (63.5 kB 查看哈希值)

上传时间 Python 3

由以下组织支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面