Redis for Humans.
项目描述
Pottery: Redis for Humans 🌎🌍🌏
Redis 很棒,但 Redis命令 并非总是直观。Pottery 是一种访问Redis的Pythonic方法。如果您知道如何使用Python字典,那么您就已经知道如何使用Pottery。Pottery 可以让您更轻松地访问Redis,并用于实现微服务弹性模式;它已在规模生产中得到实战检验。
目录
- 字典 📖
- 集合 🛍️
- 列表 ⛓
- 计数器 🧮
- 双端队列 🖇️
- 队列 🚶♂️🚶♀️🚶♂️
- Redlock 🔒
- NextId 🔢
- redis_cache()
- CachedOrderedDict
- 布隆过滤器 🌸
- HyperLogLogs 🪵
- ContextTimer ⏱️
安装
$ pip3 install pottery
使用
首先,设置您的Redis客户端
>>> from redis import Redis
>>> redis = Redis.from_url('redis://localhost:6379/1')
>>>
字典 📖
RedisDict
是一个与Python的 dict
兼容的Redis后端容器。
以下是一个使用 RedisDict
的小例子
>>> from pottery import RedisDict
>>> tel = RedisDict({'jack': 4098, 'sape': 4139}, redis=redis, key='tel')
>>> tel['guido'] = 4127
>>> tel
RedisDict{'jack': 4098, 'sape': 4139, 'guido': 4127}
>>> tel['jack']
4098
>>> del tel['sape']
>>> tel['irv'] = 4127
>>> tel
RedisDict{'jack': 4098, 'guido': 4127, 'irv': 4127}
>>> list(tel)
['jack', 'guido', 'irv']
>>> sorted(tel)
['guido', 'irv', 'jack']
>>> 'guido' in tel
True
>>> 'jack' not in tel
False
>>>
注意 RedisDict()
的前两个关键字参数:第一个是您的Redis客户端。第二个是您的字典的Redis键名。除此之外,您可以使用 RedisDict
与任何其他Python dict
一样。
限制
- 键和值必须是JSON可序列化的。
集合 🛍️
RedisSet
是一个与Python的 set
兼容的Redis后端容器。
以下是一个简要的演示
>>> from pottery import RedisSet
>>> basket = RedisSet({'apple', 'orange', 'apple', 'pear', 'orange', 'banana'}, redis=redis, key='basket')
>>> sorted(basket)
['apple', 'banana', 'orange', 'pear']
>>> 'orange' in basket
True
>>> 'crabgrass' in basket
False
>>> a = RedisSet('abracadabra', redis=redis, key='magic')
>>> b = set('alacazam')
>>> sorted(a)
['a', 'b', 'c', 'd', 'r']
>>> sorted(a - b)
['b', 'd', 'r']
>>> sorted(a | b)
['a', 'b', 'c', 'd', 'l', 'm', 'r', 'z']
>>> sorted(a & b)
['a', 'c']
>>> sorted(a ^ b)
['b', 'd', 'l', 'm', 'r', 'z']
>>>
注意 RedisSet()
的两个关键字参数:第一个是您的Redis客户端。第二个是您集合的Redis键名。除此之外,您可以使用您的 RedisSet
与其他任何Python set
一样。
使用 .contains_many()
进行更高效的多个元素成员资格测试
>>> nirvana = RedisSet({'kurt', 'krist', 'dave'}, redis=redis, key='nirvana')
>>> tuple(nirvana.contains_many('kurt', 'krist', 'chat', 'dave'))
(True, True, False, True)
>>>
限制
- 元素必须是JSON可序列化的。
列表 ⛓
RedisList
是一个与Python的 list
兼容的Redis后端容器。
>>> from pottery import RedisList
>>> squares = RedisList([1, 4, 9, 16, 25], redis=redis, key='squares')
>>> squares
RedisList[1, 4, 9, 16, 25]
>>> squares[0]
1
>>> squares[-1]
25
>>> squares[-3:]
[9, 16, 25]
>>> squares[:]
[1, 4, 9, 16, 25]
>>> squares + [36, 49, 64, 81, 100]
RedisList[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
>>>
注意 RedisList()
的两个关键字参数:第一个是您的Redis客户端。第二个是您列表的Redis键名。除此之外,您可以使用您的 RedisList
与其他任何Python list
一样。
限制
- 元素必须是JSON可序列化的。
- 在底层,Python使用数组实现
list
,Redis使用双链表实现列表。因此,在RedisList
的头部或尾部插入元素是快速的,O(1)。然而,通过索引访问RedisList
元素是慢的,O(n)。因此,在性能和理想用例方面,RedisList
与Python的deque
更相似,而不是Python的list
。与其使用RedisList
,不如考虑使用RedisDeque
。
计数器 🧮
RedisCounter
是一个与Python的 collections.Counter
兼容的Redis后端容器。
>>> from pottery import RedisCounter
>>> c = RedisCounter(redis=redis, key='my-counter')
>>> c = RedisCounter('gallahad', redis=redis, key='my-counter')
>>> c.clear()
>>> c = RedisCounter({'red': 4, 'blue': 2}, redis=redis, key='my-counter')
>>> c.clear()
>>> c = RedisCounter(redis=redis, key='my-counter', cats=4, dogs=8)
>>> c.clear()
>>> c = RedisCounter(['eggs', 'ham'], redis=redis, key='my-counter')
>>> c['bacon']
0
>>> c['sausage'] = 0
>>> del c['sausage']
>>> c.clear()
>>> c = RedisCounter(redis=redis, key='my-counter', a=4, b=2, c=0, d=-2)
>>> sorted(c.elements())
['a', 'a', 'a', 'a', 'b', 'b']
>>> c.clear()
>>> RedisCounter('abracadabra', redis=redis, key='my-counter').most_common(3)
[('a', 5), ('b', 2), ('r', 2)]
>>> c.clear()
>>> c = RedisCounter(redis=redis, key='my-counter', a=4, b=2, c=0, d=-2)
>>> from collections import Counter
>>> d = Counter(a=1, b=2, c=3, d=4)
>>> c.subtract(d)
>>> c
RedisCounter{'a': 3, 'b': 0, 'c': -3, 'd': -6}
>>>
注意 RedisCounter()
的前两个关键字参数:第一个是您的Redis客户端。第二个是您计数器的Redis键名。除此之外,您可以使用您的 RedisCounter
与其他任何Python Counter
一样。
限制
- 键必须是JSON可序列化的。
双端队列 🖇️
RedisDeque
是一个与Python的 collections.deque
兼容的Redis后端容器。
示例
>>> from pottery import RedisDeque
>>> d = RedisDeque('ghi', redis=redis, key='letters')
>>> for elem in d:
... print(elem.upper())
G
H
I
>>> d.append('j')
>>> d.appendleft('f')
>>> d
RedisDeque(['f', 'g', 'h', 'i', 'j'])
>>> d.pop()
'j'
>>> d.popleft()
'f'
>>> list(d)
['g', 'h', 'i']
>>> d[0]
'g'
>>> d[-1]
'i'
>>> list(reversed(d))
['i', 'h', 'g']
>>> 'h' in d
True
>>> d.extend('jkl')
>>> d
RedisDeque(['g', 'h', 'i', 'j', 'k', 'l'])
>>> d.rotate(1)
>>> d
RedisDeque(['l', 'g', 'h', 'i', 'j', 'k'])
>>> d.rotate(-1)
>>> d
RedisDeque(['g', 'h', 'i', 'j', 'k', 'l'])
>>> RedisDeque(reversed(d), redis=redis)
RedisDeque(['l', 'k', 'j', 'i', 'h', 'g'])
>>> d.clear()
>>> d.extendleft('abc')
>>> d
RedisDeque(['c', 'b', 'a'])
>>>
注意 RedisDeque()
的两个关键字参数:第一个是您的Redis客户端。第二个是您双端队列的Redis键名。除此之外,您可以使用您的 RedisDeque
与其他任何Python deque
一样。
限制
- 元素必须是JSON可序列化的。
队列 🚶♂️🚶♀️🚶♂️
RedisSimpleQueue
是一个与Python的 queue.SimpleQueue
兼容的Redis后端多生产者、多消费者FIFO队列。通常,如果您在一个或多个线程中使用它,请使用Python queue.Queue
;如果您在进程间使用它,请使用 multiprocessing.Queue
;如果您需要跨机器共享它,或者如果需要您的队列在应用程序崩溃或重启后持久化,请使用 RedisSimpleQueue
。
实例化一个 RedisSimpleQueue
>>> from pottery import RedisSimpleQueue
>>> cars = RedisSimpleQueue(redis=redis, key='cars')
>>>
注意 RedisSimpleQueue()
的两个关键字参数:第一个是您的Redis客户端。第二个是您队列的Redis键名。除此之外,您可以使用您的 RedisSimpleQueue
与其他任何Python queue.SimpleQueue
一样。
检查队列状态,向队列中放入一些项目,并取出这些项目
>>> cars.empty()
True
>>> cars.qsize()
0
>>> cars.put('Jeep')
>>> cars.put('Honda')
>>> cars.put('Audi')
>>> cars.empty()
False
>>> cars.qsize()
3
>>> cars.get()
'Jeep'
>>> cars.get()
'Honda'
>>> cars.get()
'Audi'
>>> cars.empty()
True
>>> cars.qsize()
0
>>>
限制
- 项目必须是JSON可序列化的。
Redlock 🔒
Redlock
是一个安全且可靠的锁,用于协调跨线程、进程甚至机器共享资源的访问,没有单点故障。有关理由和算法描述。
Redlock
尽可能地实现了Python的出色threading.Lock
API。换句话说,你可以像使用threading.Lock
一样使用Redlock
。使用Redlock
而不是threading.Lock
的主要原因是Redlock
可以协调跨不同机器共享资源的访问;threading.Lock
则不能。
创建一个Redlock
>>> from pottery import Redlock
>>> printer_lock = Redlock(key='printer', masters={redis})
>>>
key
参数表示资源,masters
参数指定了要分布锁的Redis主节点。在生产环境中,你应该有5个Redis主节点。这是为了消除单点故障——即使丢失了5个Redis主节点中的2个,你的Redlock
仍然可用且性能良好。现在你可以保护对资源的访问了
>>> if printer_lock.acquire():
... print('printer_lock is locked')
... # Critical section - print stuff here.
... printer_lock.release()
printer_lock is locked
>>> bool(printer_lock.locked())
False
>>>
或者你可以在上下文管理器内保护对资源的访问
>>> with printer_lock:
... print('printer_lock is locked')
... # Critical section - print stuff here.
printer_lock is locked
>>> bool(printer_lock.locked())
False
>>>
每次需要保护资源时创建一个新的Redlock
对象,并且不要在不同部分的代码中共享Redlock
实例是最安全的。换句话说,将key
视为标识资源;不要将任何特定的Redlock
视为标识资源。每次需要锁时创建一个新的Redlock
可以避免由于将如何使用Redlock
与应用程序/服务的forking/threading模型解耦而产生的错误。
Redlock
会自动释放(默认情况下,在10秒后)。你应该确保关键部分在超时内完成。自动释放Redlock
的原因是保持“活性”并避免死锁(如果在释放锁之前进程在关键部分中死亡)。
>>> import time
>>> if printer_lock.acquire():
... # Critical section - print stuff here.
... time.sleep(10)
>>> bool(printer_lock.locked())
False
>>>
如果10秒不足以完成执行关键部分,则可以指定自己的自动释放时间(以秒为单位)
>>> printer_lock = Redlock(key='printer', masters={redis}, auto_release_time=15)
>>> if printer_lock.acquire():
... # Critical section - print stuff here.
... time.sleep(10)
>>> bool(printer_lock.locked())
True
>>> time.sleep(5)
>>> bool(printer_lock.locked())
False
>>>
默认情况下,.acquire()
会无限期地阻塞,直到获取锁。你可以通过指定blocking
参数使.acquire()
立即返回。如果获取了锁,则.acquire()
返回True
;如果没有获取,则返回False
。
>>> printer_lock_1 = Redlock(key='printer', masters={redis})
>>> printer_lock_2 = Redlock(key='printer', masters={redis})
>>> printer_lock_1.acquire(blocking=False)
True
>>> printer_lock_2.acquire(blocking=False) # Returns immediately.
False
>>> printer_lock_1.release()
>>>
你可以通过指定timeout
参数(以秒为单位)使.acquire()
阻塞,但不无限期
>>> printer_lock_1.acquire(timeout=1)
True
>>> printer_lock_2.acquire(timeout=1) # Waits 1 second.
False
>>> printer_lock_1.release()
>>>
你可以在初始化Redlock上下文管理器时配置其阻塞/超时行为。如果上下文管理器无法获取锁,则抛出QuorumNotAchieved
异常。
>>> import contextlib
>>> from pottery import QuorumNotAchieved
>>> printer_lock_1 = Redlock(key='printer', masters={redis}, context_manager_blocking=True, context_manager_timeout=0.2)
>>> printer_lock_2 = Redlock(key='printer', masters={redis}, context_manager_blocking=True, context_manager_timeout=0.2)
>>> with printer_lock_1:
... with contextlib.suppress(QuorumNotAchieved):
... with printer_lock_2: # Waits 0.2 seconds; raises QuorumNotAchieved.
... pass
... print(f"printer_lock_1 is {'locked' if printer_lock_1.locked() else 'unlocked'}")
... print(f"printer_lock_2 is {'locked' if printer_lock_2.locked() else 'unlocked'}")
printer_lock_1 is locked
printer_lock_2 is unlocked
>>>
同步() 👯♀️
synchronize()
是一个装饰器,它允许一次只有一个线程执行一个函数。在底层,synchronize()
使用Redlock,因此请参阅Redlock文档以获取更多信息。
这是如何使用synchronize()
的方法
>>> from pottery import synchronize
>>> @synchronize(key='synchronized-func', masters={redis}, auto_release_time=.5, blocking=True, timeout=-1)
... def func():
... # Only one thread can execute this function at a time.
... return True
...
>>>
NextId 🔢
NextId
可以安全可靠地在线程、进程甚至机器之间生成递增的ID,没有单点故障。请参阅动机和算法描述。
实例化一个ID生成器
>>> from pottery import NextId
>>> tweet_ids = NextId(key='tweet-ids', masters={redis})
>>>
key
参数表示序列(因此你可以为用户ID、评论ID等拥有不同的序列),masters
参数指定了要分布ID生成的Redis主节点(在生产环境中,你应该有5个Redis主节点)。现在,每次需要用户ID时,都可以在ID生成器上调用next()
。
>>> next(tweet_ids)
1
>>> next(tweet_ids)
2
>>> next(tweet_ids)
3
>>>
有两个注意事项
- 如果许多客户端正在并发生成ID,那么ID序列可能会有“空洞”(例如:1,2,6,10,11,21,……)。
- 此算法可以扩展到每秒约5,000个ID(使用5个Redis主节点)。如果你需要比这更快的ID,那么你可能需要考虑其他技术。
redis_cache()
redis_cache()
是一个简单的轻量级无界函数返回值缓存,有时也称为 “memoize”。 redis_cache()
尽可能地实现了 Python 的优秀 functools.cache()
API。换句话说,你可以像使用 functools.cache()
一样使用 redis_cache()
。
限制
- 函数的参数必须是可哈希的。
- 函数的返回值必须是可序列化为 JSON 的。
- 和
functools.cache()
一样,redis_cache()
不允许最大大小,也不会驱逐旧值,它会无界增长。仅在以下情况下使用redis_cache()
- 你的函数参数空间具有已知的小基数。
- 当调用
redis_cache()
来装饰你的函数时,指定一个timeout
,以便在最后一次缓存访问(命中或未命中)后的timeout
秒钟删除你的 整个 返回值缓存。 - 你定期调用
.cache_clear()
来删除你的 整个 返回值缓存。 - 你接受返回值缓存无界增长,并且你 了解这对你的底层 Redis 实例的影响。
一般来说,你应该只在想要重用之前计算过的值时使用 redis_cache()
。因此,没有必要缓存具有副作用或不纯函数(如 time()
或 random()
)的函数。
装饰一个函数
>>> import time
>>> from pottery import redis_cache
>>> @redis_cache(redis=redis, key='expensive-function-cache')
... def expensive_function(n):
... time.sleep(1) # Simulate an expensive computation or database lookup.
... return n
...
>>>
注意 redis_cache()
的两个关键字参数:第一个是你的 Redis 客户端。第二个是你函数返回值缓存的 Redis 键名。
调用你的函数并观察缓存命中/未命中率
>>> expensive_function(5)
5
>>> expensive_function.cache_info()
CacheInfo(hits=0, misses=1, maxsize=None, currsize=1)
>>> expensive_function(5)
5
>>> expensive_function.cache_info()
CacheInfo(hits=1, misses=1, maxsize=None, currsize=1)
>>> expensive_function(6)
6
>>> expensive_function.cache_info()
CacheInfo(hits=1, misses=2, maxsize=None, currsize=2)
>>>
注意,第一次调用 expensive_function()
需要 1 秒并导致缓存未命中;但第二次调用几乎立即返回并导致缓存命中。这是因为第一次调用后,redis_cache()
缓存了当 n == 5
时调用的返回值。
你可以通过 expensive_function.__wrapped__
访问原始未装饰的底层 expensive_function()
。这对于内省、绕过缓存或使用不同的缓存重新包装原始函数很有用。
你可以使用 expensive_function.__bypass__(*args, **kwargs)
强制为特定的 args
/kwargs
组合重置缓存。调用 expensive_function.__bypass__(*args, **kwargs)
绕过缓存查找,调用原始底层函数,然后为将来的 expensive_function(*args, **kwargs)
调用缓存结果。请注意,调用 expensive_function.__bypass__(*args, **kwargs)
既不产生缓存命中也不产生缓存未命中。
最后,使用 expensive_function.cache_clear()
清除/使你的函数的整个返回值缓存无效。
>>> expensive_function.cache_info()
CacheInfo(hits=1, misses=2, maxsize=None, currsize=2)
>>> expensive_function.cache_clear()
>>> expensive_function.cache_info()
CacheInfo(hits=0, misses=0, maxsize=None, currsize=0)
>>>
CachedOrderedDict
通过一个示例用例来解释 CachedOrderedDict
是最好的方式。想象一下,你的搜索引擎返回文档 ID,然后你必须通过数据库将其转换为完整的文档以返回给客户端。用于表示此类搜索结果的数据结构必须具有以下属性
- 它必须保留搜索引擎返回的文档 ID 的顺序。
- 它必须将文档 ID 映射到填充的文档。
- 它必须缓存之前填充的文档。
属性 1 和 2 由 Python 的 collections.OrderedDict
满足。然而,CachedOrderedDict
扩展了 Python 的 OrderedDict
以满足属性 3。
CachedOrderedDict
最常见的使用模式如下
- 使用你必须在初始化器中传递给
dict_keys
参数的 IDs 实例化CachedOrderedDict
。 - 计算并存储缓存未命中以供未来查找。
- 将你的
CachedOrderedDict
的某些表示返回给客户端。
实例化一个 CachedOrderedDict
>>> from pottery import CachedOrderedDict
>>> search_results_1 = CachedOrderedDict(
... redis_client=redis,
... redis_key='search-results',
... dict_keys=(1, 2, 3, 4, 5),
... )
>>>
初始化参数中的 redis_client
是您的 Redis 客户端,而 redis_key
参数是 Redis 哈希表背后的 Redis 键。dict_keys
参数表示要查找并自动填充到您的 CachedOrderedDict
(在缓存命中时)的有序可迭代键,或者您将不得不为未来的查找(在缓存未命中时)计算和填充的键。无论键是缓存命中还是未命中,CachedOrderedDict
都会保留 dict_keys
的顺序(类似于列表),将这些键映射到值(类似于字典),并维护一个用于未来键查找的底层缓存。
一开始,缓存是空的,所以让我们填充它
>>> sorted(search_results_1.misses())
[1, 2, 3, 4, 5]
>>> search_results_1[1] = 'one'
>>> search_results_1[2] = 'two'
>>> search_results_1[3] = 'three'
>>> search_results_1[4] = 'four'
>>> search_results_1[5] = 'five'
>>> sorted(search_results_1.misses())
[]
>>>
注意,CachedOrderedDict
保留了 dict_keys
的顺序
>>> for key, value in search_results_1.items():
... print(f'{key}: {value}')
1: one
2: two
3: three
4: four
5: five
>>>
现在,让我们看看缓存命中和未命中的组合
>>> search_results_2 = CachedOrderedDict(
... redis_client=redis,
... redis_key='search-results',
... dict_keys=(2, 4, 6, 8, 10),
... )
>>> sorted(search_results_2.misses())
[6, 8, 10]
>>> search_results_2[2]
'two'
>>> search_results_2[6] = 'six'
>>> search_results_2[8] = 'eight'
>>> search_results_2[10] = 'ten'
>>> sorted(search_results_2.misses())
[]
>>> for key, value in search_results_2.items():
... print(f'{key}: {value}')
2: two
4: four
6: six
8: eight
10: ten
>>>
限制
- 键和值必须是JSON可序列化的。
布隆过滤器 🌸
布隆过滤器是一种强大的数据结构,可以帮助您回答问题:“我之前是否见过这个元素?”和“我见过多少个不同的元素?”但无法回答问题:“我之前都见过哪些元素?”所以将布隆过滤器视为可以添加元素、用于测试元素成员资格并获取长度的 Python 集合;但是您无法遍历或取出元素。
布隆过滤器是概率性的,这意味着它们有时可能会产生假阳性(例如,它们可能会报告您之前见过一个特定的元素,尽管您没有)。但它们永远不会产生假阴性(所以每次它们报告您之前没有见过一个特定的元素时,您确实从未见过它)。您可以通过调整您可接受的假阳性概率来调整,但这将以布隆过滤器的存储大小和元素插入/查找时间为代价。
创建一个 BloomFilter
>>> from pottery import BloomFilter
>>> dilberts = BloomFilter(
... num_elements=100,
... false_positives=0.01,
... redis=redis,
... key='dilberts',
... )
>>>
在这里,num_elements
代表您预计将插入到 BloomFilter
中的元素数量,而 false_positives
代表您可接受的假阳性概率。使用这两个参数,BloomFilter
会自动计算其存储大小和元素插入/查找时运行哈希函数的次数,以确保在您将插入指定数量的元素的情况下,可以保证假阳性率在您可容忍的范围内。
将元素插入到 BloomFilter
>>> dilberts.add('rajiv')
>>>
测试 BloomFilter
中的成员资格
>>> 'rajiv' in dilberts
True
>>> 'raj' in dilberts
False
>>> 'dan' in dilberts
False
>>>
看看我们已插入到 BloomFilter
中的元素数量
>>> len(dilberts)
1
>>>
请注意,BloomFilter.__len__()
是一个近似值,而不是精确值,尽管它相当准确。
将多个元素插入到 BloomFilter
>>> dilberts.update({'raj', 'dan'})
>>>
使用 .contains_many()
进行更高效的多个元素成员资格测试
>>> tuple(dilberts.contains_many('rajiv', 'raj', 'dan', 'luis'))
(True, True, True, False)
>>>
从 BloomFilter
中移除所有元素
>>> dilberts.clear()
>>> len(dilberts)
0
>>>
限制
- 元素必须是JSON可序列化的。
len(bf)
是概率性的,它是一个准确的近似值。您可以通过调整.__init__()
方法中的num_elements
和false_positives
参数来调整您想要的准确性,但这将以存储空间和插入/查找时间为代价。- 对布隆过滤器的成员资格测试是概率性的,因为它可能会返回假阳性,但永远不会返回假阴性。这意味着如果
element in bf
评估为True
,那么您可能已将元素插入到布隆过滤器中。但如果element in bf
评估为False
,那么您绝对没有插入该元素。再次强调,您可以通过调整.__init__()
方法中的num_elements
和false_positives
参数来调整准确性,但这将以存储空间和插入/查找时间为代价。
HyperLogLogs 🪵
HyperLogLogs 是一种有趣的数据结构,旨在回答问题:“我见过多少个不同的元素?”但它无法回答问题:“我之前是否见过这个元素?”或“我之前都见过哪些元素?”所以将 HyperLogLogs 视为可以添加元素并获取长度的 Python 集合;但是您无法使用它来测试元素成员资格、遍历或取出元素。
HyperLogLogs(超日志日志)是概率性的,这意味着它们在误差范围内(最多2%)是准确的。然而,它们可以使用极小的存储空间(1.5 KB)合理准确地估计大规模数据集(如每天唯一的谷歌搜索数量)的大小。
创建一个HyperLogLog
>>> from pottery import HyperLogLog
>>> google_searches = HyperLogLog(redis=redis, key='google-searches')
>>>
将一个元素插入到HyperLogLog
>>> google_searches.add('sonic the hedgehog video game')
>>>
查看我们已插入到HyperLogLog
中的元素数量
>>> len(google_searches)
1
>>>
将多个元素插入到HyperLogLog
>>> google_searches.update({
... 'google in 1998',
... 'minesweeper',
... 'joey tribbiani',
... 'wizard of oz',
... 'rgb to hex',
... 'pac-man',
... 'breathing exercise',
... 'do a barrel roll',
... 'snake',
... })
>>> len(google_searches)
10
>>>
通过一个巧妙的技巧,我们可以对HyperLogLog
进行成员资格测试,尽管它从未为此目的而设计。该技巧的工作方式是创建HyperLogLog
的临时副本,然后将在成员资格测试中运行的元素插入到临时副本中。如果插入更改了临时HyperLogLog
的基数,则该元素肯定没有插入到原始HyperLogLog
中。
>>> 'joey tribbiani' in google_searches
True
>>> 'jennifer aniston' in google_searches
False
>>>
使用 .contains_many()
进行更高效的多个元素成员资格测试
>>> tuple(google_searches.contains_many('joey tribbiani', 'jennifer aniston'))
(True, False)
>>>
从HyperLogLog
中删除所有元素
>>> google_searches.clear()
>>> len(google_searches)
0
>>>
限制
- 元素必须是JSON可序列化的。
len(hll)
是概率性的,因为它是一个准确的近似值。- 对HyperLogLog进行成员资格测试是概率性的,因为它可能会返回假阳性,但永远不会返回假阴性。这意味着如果
element in hll
评估为True
,那么你可能已经将元素插入到HyperLogLog中。但如果你评估element in hll
为False
,那么你肯定没有插入它。
上下文计时器 ⏱️
ContextTimer
可以帮助你轻松准确地测量经过的时间。请注意,ContextTimer
测量的是墙时间(现实世界时间),而不是CPU时间;elapsed()
返回的时间以毫秒为单位。
你可以单独使用ContextTimer
...
>>> import time
>>> from pottery import ContextTimer
>>> timer = ContextTimer()
>>> timer.start()
>>> time.sleep(0.1)
>>> 100 <= timer.elapsed() < 200
True
>>> timer.stop()
>>> time.sleep(0.1)
>>> 100 <= timer.elapsed() < 200
True
>>>
…或者作为上下文管理器
>>> tests = []
>>> with ContextTimer() as timer:
... time.sleep(0.1)
... tests.append(100 <= timer.elapsed() < 200)
>>> time.sleep(0.1)
>>> tests.append(100 <= timer.elapsed() < 200)
>>> tests
[True, True]
>>>
贡献
获取源代码
- 克隆git仓库
$ git clone git@github.com:brainix/pottery.git
$ cd pottery/
- 安装项目级依赖项
$ make install
运行测试
- 在一个终端会话中
$ cd pottery/
$ redis-server
- 在第二个终端会话中
$ cd pottery/
$ make test
$ make test-readme
make test
运行所有单元测试以及覆盖率测试。然而,有时在调试时运行单个测试模块、类或方法可能很有用。
- 在一个终端会话中
$ cd pottery/
$ redis-server
- 在第二个终端会话中
- 使用
$ make test tests=tests.test_dict
运行测试模块 - 使用:
$ make test tests=tests.test_dict.DictTests
运行测试类 - 使用:
$ make test tests=tests.test_dict.DictTests.test_keyexistserror
运行测试方法
- 使用
make test-readme
对README中的Python代码示例进行doctests,以确保其正确性。
项目详情
下载文件
下载适合您平台的文件。如果您不确定该选择哪个,请了解有关安装包的更多信息。