跳转到主要内容

MongoDB连接池和容器实现,用于Zope3

项目描述

此包提供了一种基于某些核心Zope组件库的MongoDB对象映射框架,包括Zope事务支持。此包可以使用或不需要使用zope.persistent,并且可以作为ZODB的完全替代品。该包基于Zope本身并不庞大,可以在任何需要从MongoDB到Python对象的桥梁的Python项目中使用。

README

重要:如果您使用–all选项运行测试,将在端口45017启动一个真实的MongoDB模拟服务器!

此包提供非持久化MongoDB对象实现。如果您想在混合MongoDB/ZODB应用程序设置中使用它们,可以轻松地将它们与Persistent.Persistent和Contained.Contained混合。我们目前将此框架用作ORM(对象关系映射),将MongoDB对象映射到基于Python/Zope的schema对象,包括验证等。

在我们的上一个项目中,我们从一个混合ZODB/MongoDB应用程序开始,我们将Persistent.persistent混合到IMongoContainer对象中。但后来我们对性能和稳定性非常兴奋,所以我们完全移除了ZODB持久化层。现在我们在应用程序中使用无ZODB设置,从非持久化项目作为我们的应用程序根开始。用于此类无ZODB应用程序设置的所需工具都位于p01.publisher和p01.recipe.setup包中。

注意:一些测试使用位于m01/mongo/testing的伪造MongoDB,而其他测试将使用我们来自m01.stub包的MongoDB模拟。如果您想运行完整的测试,可以使用–all选项,这将启动和停止MongoDB模拟服务器。

注意:所有MongoDB项目接口将不会提供ILocation或IContained,但基础MongoDB项目实现将实现Location,它直接提供ILocation接口。这使得在ZCML中的权限声明更简单。

设置

>>> import pymongo
>>> import zope.component
>>> from m01.mongo import interfaces

MongoClient

设置Mongo客户端

>>> client = pymongo.MongoClient('localhost', 45017)
>>> client
MongoClient(host=['127.0.0.1:45017'])

如您所见,客户端可以访问数据库

>>> db = client.m01MongoTesting
>>> db
Database(MongoClient(host=['127.0.0.1:45017']), u'm01MongoTesting')

一个数据库可以返回一个集合

>>> collection = db['m01MongoTest']
>>> collection
Collection(Database(MongoClient(host=['127.0.0.1:45017']), u'm01MongoTesting'), u'm01MongoTest')

如您所见,我们可以向集合写入

>>> res = collection.update_one({'_id': '123'}, {'$inc': {'counter': 1}},
...     upsert=True)
>>> res
<pymongo.results.UpdateResult object at ...>
>>> res.raw_result
{'updatedExisting': False, 'nModified': 0, 'ok': 1, 'upserted': '123', 'n': 1}

我们可以从集合中读取

>>> collection.find_one({'_id': '123'})
{u'_id': u'123', u'counter': 1}

从我们的测试集合中删除结果

>>> res = collection.delete_one({'_id': '123'})
>>> res
<pymongo.results.DeleteResult object at ...>
>>> res.raw_result
{'ok': 1, 'n': 1}

拆解

现在使用我们的当前MongoDB连接拆解我们的MongoDB数据库

>>> import time
>>> time.sleep(1)
>>> client.drop_database('m01MongoTesting')

MongoContainer

MongoContainer可以在MongoDB中存储IMongoContainerItem对象。MongoContainerItem必须能够将其数据导出到有效的MongoDB数据。此测试将展示我们的MongoContainer如何工作。

条件

首先导入一些组件

>>> import json
>>> import transaction
>>> import zope.interface
>>> import zope.schema
>>> import m01.mongo.item
>>> import m01.mongo.testing
>>> from m01.mongo.fieldproperty import MongoFieldProperty
>>> from m01.mongo import interfaces

在我们开始测试之前,检查我们的线程局部缓存是否为空或是否有来自以前测试的残留垃圾

>>> from m01.mongo import LOCAL
>>> m01.mongo.testing.pprint(LOCAL.__dict__)
{}

设置

并设置一个数据库根

>>> root = {}

MongoContainerItem

>>> class ISampleContainerItem(interfaces.IMongoContainerItem,
...     zope.location.interfaces.ILocation):
...     """Sample item interface."""
...
...     title = zope.schema.TextLine(
...         title=u'Object Title',
...         description=u'Object Title',
...         required=True)
>>> class SampleContainerItem(m01.mongo.item.MongoContainerItem):
...     """Sample container item"""
...
...     zope.interface.implements(ISampleContainerItem)
...
...     title = MongoFieldProperty(ISampleContainerItem['title'])
...
...     dumpNames = ['title']

MongoContainer

>>> class ISampleContainer(interfaces.IMongoContainer):
...     """Sample container interface."""
>>> class SampleContainer(m01.mongo.container.MongoContainer):
...     """Sample container."""
...
...     zope.interface.implements(ISampleContainer)
...
...     @property
...     def collection(self):
...         db = m01.mongo.testing.getTestDatabase()
...         return db['test']
...
...     def load(self, data):
...         """Load data into the right mongo item."""
...         return SampleContainerItem(data)
>>> container = SampleContainer()
>>> root['container'] = container

创建一个对象树

现在我们可以使用映射API将一个示例MongoContainerItem添加到我们的容器中

>>> data = {'title': u'Title'}
>>> item = SampleContainerItem(data)
>>> container = root['container']
>>> container[u'item'] = item

事务

Zope为存储在数据库中的对象提供事务。我们也提供这样的交易和一个交易数据管理器来将我们的对象存储在MongoDB中。这意味着目前我们的测试数据库中没有存储任何内容,因为我们还没有提交事务

>>> collection = m01.mongo.testing.getTestCollection()
>>> collection.count()
0

让我们提交我们的交易并将容器项目存储在MongoDB中

>>> transaction.commit()
>>> collection = m01.mongo.testing.getTestCollection()
>>> collection.count()
1

提交后,线程局部存储为空

>>> LOCAL.__dict__
{}

MongoDB数据

如您所见,以下数据被存储在我们的MongoDB中

>>> data = collection.find_one({'__name__': 'item'})
>>> m01.mongo.testing.pprint(data)
{u'__name__': u'item',
 u'_id': ObjectId('...'),
 u'_pid': None,
 u'_type': u'SampleContainerItem',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'title': u'Title'}

对象

我们可以从我们的容器中获取,MongoDB将从中加载数据

>>> obj = container[u'item']
>>> obj
<SampleContainerItem u'item'>
>>> obj.title
u'Title'

让我们拆解我们的测试设置

>>> transaction.commit()
>>> from m01.mongo import clearThreadLocalCache
>>> clearThreadLocalCache()

如您所见,我们的缓存项被删除

>>> from m01.mongo import LOCAL
>>> m01.mongo.testing.pprint(LOCAL.__dict__)
{}

MongoStorage

MongoStorage可以在MongoDB中存储IMongoStorageItem对象。MongoStorageItem必须能够将其数据导出到有效的MongoDB值。此测试将展示我们的MongoStorage如何工作,同时也展示了其局限性。

注意:mongo 容器也实现了类似于存储实现的容器/映射模式。唯一的区别是,容器仅提供使用 container[key] = obj、container[key] 和 del container[key] 的映射 API。存储 API 不提供显式的映射键,而是提供添加和删除方法。这意味着容器使用自己的命名模式,而存储使用 mongodb._id 作为其对象名称(obj.__name__)。

条件

在开始测试之前,检查我们的线程局部缓存是否为空,或者是否有之前的测试遗留下来的垃圾。

>>> from m01.mongo import LOCAL
>>> from m01.mongo.testing import pprint
>>> pprint(LOCAL.__dict__)
{}

设置

首先导入一些组件

>>> import datetime
>>> import transaction
>>> from zope.container.interfaces import IReadContainer
>>> from m01.mongo import interfaces
>>> from m01.mongo import testing

并设置一个数据库根

>>> root = {}

MongoStorageItem

Mongo 项默认提供存储为 _id 的 ObjectId。如果在创建对象时没有提供,我们将设置一个。

>>> data = {}
>>> obj = testing.SampleStorageItem(data)
>>> obj._id
ObjectId('...')

ObjectId 也用作我们的 __name__ 值。如果需要选择自己的名称,请参阅 MongoContainer 和 MongoContainerItem 的实现。

>>> obj.__name__
u'...'
>>> obj.__name__ == unicode(obj._id)
True

Mongo 项还提供创建和修改日期属性。如果我们初始化一个没有给定创建日期的对象,将使用新的 utc datetime 实例。

>>> obj.created
datetime.datetime(..., tzinfo=UTC)
>>> obj.modified is None
True

Mongo 存储项知道状态是否已更改。这意味着我们可以找出是否应该将项写回 MongoDB。MongoItem 将状态存储在 _m_changed 值中,就像持久对象在 _p_changed 中做的那样。如你所见,初始状态是 `None`

>>> obj._m_changed is None
True

MongoItem 还有一个版本号,我们每次更改项时都会将其递增。默认情况下,此版本设置为 _version 属性,默认为 0(零)。

>>> obj._version
0

如果我们更改 MongoItem 中的值,状态将更改

>>> obj.title = u'New Title'
>>> obj._m_changed
True

但版本号不会增加。我们只有在将项保存到 MongoDB 中时才增加版本号。

>>> obj._version
0

如果删除值,我们也会更改 _m_change 标记。

>>> obj = testing.SampleStorageItem(data)
>>> obj._m_changed is None
True
>>> obj.title
u''
>>> obj.title = u'New Title'
>>> obj._m_changed
True
>>> obj.title
u'New Title'

现在在我们删除属性之前,将 _m_chande 属性设置为 False。

>>> obj._m_changed = False
>>> obj._m_changed
False
>>> del obj.title

如你所见,我们可以删除一个属性,但它只会回退到默认架构字段值。这似乎是可行的。

>>> obj.title
u''
>>> obj._m_changed
True

MongoStorage

现在我们可以将 MongoStorage 添加到 zope 数据库中。

>>> storage = testing.SampleStorage()
>>> root['storage'] = storage
>>> transaction.commit()

现在我们可以将一个示例 MongoStorageItem 添加到我们的存储中。注意,我们只能使用返回新生成 __name__ 的 add 方法。此实现不支持使用自己的名称。如你所见,名称是 MongoDB 24 个十六进制字符字符串 ObjectId 表示。

>>> data = {'title': u'Title',
...         'description': u'Description'}
>>> item = testing.SampleStorageItem(data)
>>> storage = root['storage']

我们的存储提供 IMongoStorage 和 IReadContainer 接口。

>>> interfaces.IMongoStorage.providedBy(storage)
True
>>> IReadContainer.providedBy(storage)
True

添加

我们可以通过使用 add 方法将 mongo 项添加到我们的存储中。

>>> __name__ = storage.add(item)
>>> __name__
u'...'
>>> len(__name__)
24
>>> transaction.commit()

添加我们的项后,项提供了一个创建日期。

>>> item.created
datetime.datetime(..., tzinfo=UTC)

__len__

>>> storage = root['storage']
>>> len(storage)
1

__getitem__

>>> item = storage[__name__]
>>> item
<SampleStorageItem ...>

如你所见,我们的 MongoStorageItem 提供以下数据。我们可以转储项。请注意,你可能需要实现一个自定义转储方法,该方法将转储适合你的 MongoStorageItem 的正确数据。

>>> pprint(item.dump())
{'__name__': '...',
 '_id': ObjectId('...'),
 '_pid': None,
 '_type': 'SampleStorageItem',
 '_version': 1,
 'comments': [],
 'created': datetime.datetime(..., tzinfo=UTC),
 'date': None,
 'description': 'Description',
 'item': None,
 'modified': datetime.datetime(..., tzinfo=UTC),
 'number': None,
 'numbers': [],
 'title': 'Title'}

该对象还提供了一个名称,这是我们在添加对象时获得的名称。

>>> item.__name__ == __name__
True

keys

容器还可以返回键。

>>> tuple(storage.keys())
(u'...',)

values

容器还可以返回值。

>>> tuple(storage.values())
(<SampleStorageItem ...>,)

items

容器还可以返回项。

>>> tuple(storage.items())
((u'...', <SampleStorageItem ...>),)

__delitem__

接下来我们将删除项。

>>> del storage[__name__]
>>> storage.get(__name__) is None
True
>>> transaction.commit()

对象修改

如果我们从存储中获取一个 mongo 项并修改该项,版本号将增加一,并设置当前修改的 datetime。

让我们添加一个新项。

>>> data = {'title': u'A Title',
...         'description': u'A Description'}
>>> item = testing.SampleStorageItem(data)
>>> __name__ = storage.add(item)
>>> transaction.commit()

现在获取项

>>> item = storage[__name__]
>>> item.title
u'A Title'

并更改标题

>>> item.title = u'New Title'
>>> item.title
u'New Title'

如你所见,项被标记为已更改

>>> item._m_changed
True

现在获取 mongo 项版本。这应该设置为 1(一),因为我们只添加了对象,自添加以来没有更改

>>> item._version
1

如果我们现在提交事务,版本号将增加一

>>> transaction.commit()
>>> item._version
2

如果现在从 MongoDB 再次加载 mongo 项,您可以看到标题已更改

>>> item = storage[__name__]
>>> item.title
u'New Title'

并且版本已更新为 2

>>> item._version
2
>>> transaction.commit()

在我们离开这个测试之前检查我们的线程局部缓存

>>> pprint(LOCAL.__dict__)
{}

共享 MongoContainer

MongoContainer 可以在 MongoDB 中存储非持久性 IMongoContainerItem 对象。MongoContainerItem 必须能够将其数据转储为有效的 mongo 值。这个测试将展示我们的 MongoContainer 如何工作。

条件

在开始测试之前,检查我们的线程局部缓存是否为空,或者是否有之前的测试遗留下来的垃圾。

>>> from m01.mongo.testing import pprint
>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{}

设置

首先导入一些组件

>>> import datetime
>>> import transaction
>>> from zope.container.interfaces import IContainer
>>> import m01.mongo
>>> import m01.mongo.base
>>> import m01.mongo.container
>>> from m01.mongo import interfaces
>>> from m01.mongo import testing

我们还需要一个应用程序根对象。让我们定义一个静态 MongoContainer 作为我们的应用程序数据库根项。

>>> class MongoRoot(m01.mongo.container.MongoContainer):
...     """Mongo application root"""
...
...     _id = m01.mongo.getObjectId(0)
...
...     def __init__(self):
...         pass
...
...     @property
...     def collection(self):
...         return testing.getRootItems()
...
...     @property
...     def cacheKey(self):
...         return 'root'
...
...     def load(self, data):
...         """Load data into the right mongo item."""
...         return testing.Companies(data)
...
...     def __repr__(self):
...         return '<%s %s>' % (self.__class__.__name__, self._id)

如您所见,我们的 MongoRoot 类定义了一个静态 mongo ObjectID 作为 _id。这意味着每次都使用相同的中_id。这个 _id 充作我们的 __parent__ 引用。

以下方法允许我们生成新的 MongoRoot 实例。这允许我们展示我们如何生成不同的根项,就像我们在服务器重启时做的那样。

>>> def getRoot():
...     return MongoRoot()

这是我们数据库根项

>>> root = getRoot()
>>> root
<MongoRoot 000000000000000000000000>
>>> root._id
ObjectId('000000000000000000000000')

容器

现在让我们使用我们的增强型测试数据和设置内容结构

>>> data = {'name': u'Europe'}
>>> europe = testing.Companies(data)
>>> root[u'europe'] = europe
>>> data = {'name': u'Asia'}
>>> asia = testing.Companies(data)
>>> root[u'asia'] = asia
>>> transaction.commit()

现在让我们检查 MongoDB 中的公司

>>> rootCollection = testing.getRootItems()
>>> obj = rootCollection.find_one({'name': 'Europe'})
>>> pprint(obj)
{u'__name__': u'europe',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'Companies',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'Europe'}

现在让我们添加一个公司、雇主和一些文档

>>> data = {'name': u'Projekt01 GmbH'}
>>> pro = testing.Company(data)
>>> europe[u'pro'] = pro
>>> data = {'name': u'Roger Ineichen'}
>>> roger = testing.Employer(data)
>>> pro[u'roger'] = roger
>>> data = {'name': u'Manual'}
>>> manual = testing.Document(data)
>>> roger[u'manual'] = manual
>>> transaction.commit()

如您所见,我们使用容器和项对象添加了数据结构

>>> root['europe']
<Companies u'europe'>
>>> root['europe']['pro']
<Company u'pro'>
>>> root['europe']['pro']['roger']
<Employer u'roger'>
>>> root['europe']['pro']['roger']['manual']
<Document u'manual'>

如您所见,这个结构与它们的 __parent__ 引用相关。这意味着如果我们将另一个结构添加到同一个 MongoDB 中,每个项都知道它的容器。

>>> data = {'name': u'Credit Suisse'}
>>> cs = testing.Company(data)
>>> asia[u'cs'] = cs
>>> data = {'name': u'Max Muster'}
>>> max = testing.Employer(data)
>>> cs[u'max'] = max
>>> data = {'name': u'Paper'}
>>> paper = testing.Document(data)
>>> max[u'paper'] = paper
>>> transaction.commit()
>>> root['asia']
<Companies u'asia'>
>>> root['asia']['cs']
<Company u'cs'>
>>> root['asia']['cs']['max']
<Employer u'max'>
>>> root['asia']['cs']['max']['paper']
<Document u'paper'>

我们不能从另一个父容器访问相同类型的其他项

>>> root['europe']['cs']
Traceback (most recent call last):
...
KeyError: 'cs'
>>> transaction.commit()

如您所见, KeyError 将项留在了我们的线程局部缓存中。我们可以使用我们的线程局部缓存清理事件处理器,该处理器默认注册为 EndRequestEvent 订阅者以清理线程局部缓存

>>> pprint(LOCAL.__dict__)
{u'europe': {'loaded': {}, 'removed': {}}}

让我们使用我们的订阅者

>>> from m01.mongo import clearThreadLocalCache
>>> clearThreadLocalCache()

如您所见,我们的缓存项被删除

>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{}

共享容器

现在让我们实现一个包含所有 IEmployer 项的共享容器

>>> class SharedEployers(m01.mongo.container.MongoContainer):
...     """Shared Employer container"""
...
...     # mark a container as shared by set the _mpid to None
...     _mpid = None
...
...     @property
...     def collection(self):
...         return testing.getEmployers()
...
...     def load(self, data):
...         return testing.Employer(data)

现在让我们尝试看共享容器是否可以访问所有雇主项

>>> shared = SharedEployers()
>>> pprint(tuple(shared.items()))
((u'roger', <Employer u'roger'>), (u'max', <Employer u'max'>))
>>> for obj in shared.values():
...     pprint(obj.dump())
{'__name__': u'roger',
 '_id': ObjectId('...'),
 '_pid': ObjectId('...'),
 '_type': u'Employer',
 '_version': 1,
 'created': datetime.datetime(..., tzinfo=UTC),
 'modified': datetime.datetime(..., tzinfo=UTC),
 'name': u'Roger Ineichen'}
{'__name__': u'max',
 '_id': ObjectId('...'),
 '_pid': ObjectId('...'),
 '_type': u'Employer',
 '_version': 1,
 'created': datetime.datetime(..., tzinfo=UTC),
 'modified': datetime.datetime(..., tzinfo=UTC),
 'name': u'Max Muster'}

现在提交我们的事务,这将清理我们的缓存。数据库清理是在我们的测试拆解中完成的

>>> transaction.commit()

在我们离开这个测试之前检查我们的线程局部缓存

>>> pprint(LOCAL.__dict__)
{}

MongoObject

MongoObject 可以独立于其他任何内容存储在 MongoDB 中。此类 MongoObject 可以与一个名为 MongoOjectProperty 的字段属性一起使用。字段属性负责将此类 MongoObject 设置和获取到 MongoDB。在 MongoObjectProperty 中提供此类 MongoObject 的持久项只需要提供一个具有唯一值的 oid 属性。您可以使用 m01.oid 包来获取此类唯一 oid 或实现自己的模式。

MongoObject 使用 __parent__._moid 和属性(字段)名称作为其唯一的 MongoDB 键。

注意,此测试使用了一个假 MongoDB 服务器设置。但这个假服务器远远没有完成。如果我们在其他项目中需要更多功能,我们将添加更多功能到这个假服务器。有关更多信息,请参阅 testing.py。

条件

在开始测试之前,检查我们的线程局部缓存是否为空,或者是否有之前的测试遗留下来的垃圾。

>>> from m01.mongo.testing import pprint
>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{}

设置

首先导入一些组件

>>> import datetime
>>> import transaction
>>> from m01.mongo import interfaces
>>> from m01.mongo import testing

首先,我们需要设置一个持久对象

>>> content = testing.Content(42)
>>> content._moid
42

并将它们添加到 ZODB

>>> root = {}
>>> root['content'] = content
>>> transaction.commit()
>>> content = root['content']
>>> content
<Content 42>

MongoObject

现在让我们将一个 MongoObject 实例添加到我们的示例内容对象中

>>> data = {'title': u'Mongo Object Title',
...         'description': u'A Description',
...         'item': {'text':u'Item'},
...         'date': datetime.date(2010, 2, 28).toordinal(),
...         'numbers': [1,2,3],
...         'comments': [{'text':u'Comment 1'}, {'text':u'Comment 2'}]}
>>> obj = testing.SampleMongoObject(data)
>>> obj._id
ObjectId('...')

obj.title u'Mongo Object Title'

>>> obj.description
u'A Description'
>>> obj.item
<SampleSubItem u'...'>
>>> obj.item.text
u'Item'
>>> obj.numbers
[1, 2, 3]
>>> obj.comments
[<SampleSubItem u'...'>, <SampleSubItem u'...'>]
>>> tuple(obj.comments)[0].text
u'Comment 1'
>>> tuple(obj.comments)[1].text
u'Comment 2'

我们的 MongoObject 目前不提供 _aprent__ 或 __name__

>>> obj.__parent__ is None
True
>>> obj.__name__ is None
True

但是,在将 mongo 对象添加到使用 MongoObjectProperty 的内容之后,mongo 对象将被定位,并成为属性名称作为 _field 值。如果对象没有提供 __name__,则相同的值也将应用于 __name__

>>> content.obj = obj
>>> obj.__parent__
<Content 42>
>>> obj.__name__
u'obj'
>>> obj.__name__
u'obj'

添加我们的 mongo 对象后,应该在我们的线程局部缓存中有一个引用

>>> pprint(LOCAL.__dict__)
{u'42:obj': <SampleMongoObject u'obj'>,
 'MongoTransactionDataManager': <m01.mongo.tm.MongoTransactionDataManager object at ...>}

MongoObject 提供了一个 _oid 属性,该属性用作 MongoDB 键。此值使用 __parent__._moid 和 mongo 对象的属性名称

>>> obj._oid == '%s:%s' % (content._moid, obj.__name__)
True
>>> obj._oid
u'42:obj'

现在检查我们是否可以再次获取 mongo 对象,并且是否仍然获取相同的值

>>> obj = content.obj
>>> obj.title
u'Mongo Object Title'
>>> obj.description
u'A Description'
>>> obj.item
<SampleSubItem u'...'>
>>> obj.item.text
u'Item'
>>> obj.numbers
[1, 2, 3]
>>> obj.comments
[<SampleSubItem u'...'>, <SampleSubItem u'...'>]
>>> tuple(obj.comments)[0].text
u'Comment 1'
>>> tuple(obj.comments)[1].text
u'Comment 2'

现在让我们提交事务,这将存储 obj 在我们的假 mongo DB 中

>>> transaction.commit()

在我们提交到 MongoDB 后,mongo 对象和我们的事务数据管理器引用应该在线程局部缓存中消失

>>> pprint(LOCAL.__dict__)
{}

现在再次检查我们的mongo对象值。如果你的内容项存储在ZODB中,你会从一个ZODB连接根获取内容项

>>> content = root['content']
>>> content
<Content 42>
>>> obj = content.obj
>>> obj
<SampleMongoObject u'obj'>
>>> obj.title
u'Mongo Object Title'
>>> obj.description
u'A Description'
>>> obj.item
<SampleSubItem u'...'>
>>> obj.item.text
u'Item'
>>> obj.numbers
[1, 2, 3]
>>> obj.comments
[<SampleSubItem u'...'>, <SampleSubItem u'...'>]
>>> tuple(obj.comments)[0].text
u'Comment 1'
>>> tuple(obj.comments)[1].text
u'Comment 2'
>>> pprint(obj.dump())
{'__name__': u'obj',
 '_field': u'obj',
 '_id': ObjectId('...'),
 '_oid': u'42:obj',
 '_type': u'SampleMongoObject',
 '_version': 1,
 'comments': [{'_id': ObjectId('...'),
               '_type': u'SampleSubItem',
               'created': datetime.datetime(...),
               'modified': None,
               'text': u'Comment 1'},
              {'_id': ObjectId('...'),
               '_type': u'SampleSubItem',
               'created': datetime.datetime(...),
               'modified': None,
               'text': u'Comment 2'}],
 'created': datetime.datetime(...),
 'date': 733831,
 'description': u'A Description',
 'item': {'_id': ObjectId('...'),
          '_type': u'SampleSubItem',
          'created': datetime.datetime(...),
          'modified': None,
          'text': u'Item'},
 'modified': datetime.datetime(...),
 'number': None,
 'numbers': [1, 2, 3],
 'removed': False,
 'title': u'Mongo Object Title'}
>>> transaction.commit()
>>> pprint(LOCAL.__dict__)
{}

现在让我们用一个新的替换现有的项,并将另一个项添加到项列表中。同时确保我们可以使用append而不是像zope小部件那样重新应用整个列表

>>> content = root['content']
>>> obj = content.obj
>>> obj.item = testing.SampleSubItem({'text': u'New Item'})
>>> newItem = testing.SampleSubItem({'text': u'New List Item'})
>>> obj.comments.append(newItem)
>>> obj.numbers.append(4)
>>> transaction.commit()

再次检查

>>> content = root['content']
>>> obj = content.obj
>>> obj.title
u'Mongo Object Title'
>>> obj.description
u'A Description'
>>> obj.item
<SampleSubItem u'...'>
>>> obj.item.text
u'New Item'
>>> obj.numbers
[1, 2, 3, 4]
>>> obj.comments
[<SampleSubItem u'...'>, <SampleSubItem u'...'>]
>>> tuple(obj.comments)[0].text
u'Comment 1'
>>> tuple(obj.comments)[1].text
u'Comment 2'

现在重新应用列表字段的完整值列表

>>> comOne = testing.SampleSubItem({'text': u'First List Item'})
>>> comTwo = testing.SampleSubItem({'text': u'Second List Item'})
>>> comments = [comOne, comTwo]
>>> obj.comments = comments
>>> obj.numbers = [1,2,3,4,5]
>>> transaction.commit()

再次检查

>>> content = root['content']
>>> obj = content.obj
>>> len(obj.comments)
2
>>> obj.comments
[<SampleSubItem u'...'>, <SampleSubItem u'...'>]
>>> len(obj.numbers)
5
>>> obj.numbers
[1, 2, 3, 4, 5]

还要检查我们是否可以删除列表项

>>> obj.numbers.remove(1)
>>> obj.numbers.remove(2)
>>> obj.comments.remove(comTwo)
>>> transaction.commit()

再次检查

>>> content = root['content']
>>> obj = content.obj
>>> len(obj.comments)
1
>>> obj.comments
[<SampleSubItem u'...'>]
>>> len(obj.numbers)
3
>>> obj.numbers
[3, 4, 5]
>>> transaction.commit()

我们也可以通过__name__从项列表中删除项

>>> content = root['content']
>>> obj = content.obj
>>> del obj.comments[comOne.__name__]
>>> transaction.commit()

再次检查

>>> content = root['content']
>>> obj = content.obj
>>> len(obj.comments)
0
>>> obj.comments
[]
>>> transaction.commit()

或者我们可以通过名称向项列表中添加项

>>> content = root['content']
>>> obj = content.obj
>>> obj.comments[comOne.__name__] = comOne
>>> transaction.commit()

再次检查

>>> content = root['content']
>>> obj = content.obj
>>> len(obj.comments)
1
>>> obj.comments
[<SampleSubItem u'...'>]
>>> transaction.commit()

覆盖率

我们的项目列表还提供以下方法

>>> obj.comments.__contains__(comOne.__name__)
True
>>> comOne.__name__ in obj.comments
True
>>> obj.comments.get(comOne.__name__)
<SampleSubItem u'...'>
>>> obj.comments.keys() == [comOne.__name__]
True
>>> obj.comments.values()
<generator object ...>
>>> tuple(obj.comments.values())
(<SampleSubItem u'...'>,)
>>> obj.comments.items()
<generator object ...>
>>> tuple(obj.comments.items())
((u'...', <SampleSubItem u'...'>),)
>>> obj.comments == obj.comments
True

让我们测试一些内部方法以提高覆盖率

>>> obj.comments._m_changed
Traceback (most recent call last):
...
AttributeError: _m_changed is a write only property
>>> obj.comments._m_changed = False
Traceback (most recent call last):
...
ValueError: Can only dispatch True to __parent__
>>> obj.comments.locate(42)

我们的简单值类型列表还提供以下方法

>>> obj.numbers.__contains__(3)
True
>>> 3 in obj.numbers
True
>>> obj.numbers == obj.numbers
True
>>> obj.numbers.pop()
5
>>> del obj.numbers[0]
>>> obj.numbers[0] = 42
>>> obj.numbers._m_changed
Traceback (most recent call last):
...
AttributeError: _m_changed is a write only property
>>> obj.numbers._m_changed = False
Traceback (most recent call last):
...
ValueError: Can only dispatch True to __parent__

在我们离开这个测试之前检查我们的线程局部缓存

>>> pprint(LOCAL.__dict__)
{}

地理定位

地理定位项可以存储地理定位,并作为子项在项中使用,提供经度和纬度。除此之外,地理定位还提供_m_changed派发概念,能够在lon/lat发生变化时通知__parent__项。项还提供ILocation以支持安全查找。字段属性负责应用__parent__和__name__。

地理定位项支持按顺序存储经度和纬度,并保留它们。

条件

在开始测试之前,检查我们的线程局部缓存是否为空,或者是否有之前的测试遗留下来的垃圾。

>>> from m01.mongo.testing import pprint
>>> from m01.mongo import LOCAL
>>> from m01.mongo.testing import reNormalizer
>>> pprint(LOCAL.__dict__)
{}

设置

首先导入一些组件

>>> import datetime
>>> import transaction
>>> import m01.mongo
>>> import m01.mongo.base
>>> import m01.mongo.geo
>>> import m01.mongo.container
>>> from m01.mongo import interfaces
>>> from m01.mongo import testing

我们还需要一个应用程序根对象。让我们定义一个静态 MongoContainer 作为我们的应用程序数据库根项。

>>> class MongoRoot(m01.mongo.container.MongoContainer):
...     """Mongo application root"""
...
...     _id = m01.mongo.getObjectId(0)
...
...     def __init__(self):
...         pass
...
...     @property
...     def collection(self):
...         return testing.getRootItems()
...
...     @property
...     def cacheKey(self):
...         return 'root'
...
...     def load(self, data):
...         """Load data into the right mongo item."""
...         return testing.GeoSample(data)
...
...     def __repr__(self):
...         return '<%s %s>' % (self.__class__.__name__, self._id)

以下方法允许我们生成新的 MongoRoot 实例。这允许我们展示我们如何生成不同的根项,就像我们在服务器重启时做的那样。

>>> def getRoot():
...     return MongoRoot()

这是我们数据库根项

>>> root = getRoot()
>>> root
<MongoRoot 000000000000000000000000>
>>> root._id
ObjectId('000000000000000000000000')

索引

首先设置索引

>>> collection = testing.getRootItems()
>>> from pymongo import GEO2D
>>> collection.create_index([('lonlat', GEO2D)])
u'lonlat_2d'

地理样本

如你所见,我们可以在lon/lat值的列表中或lon/lat字典中初始化一个地理定位

>>> data = {'name': u'sample', 'lonlat': {'lon': 1, 'lat': 3}}
>>> sample = testing.GeoSample(data)
>>> sample.lonlat
<GeoLocation lon:1.0, lat:3.0>
>>> data = {'name': u'sample', 'lonlat': [1, 3]}
>>> sample = testing.GeoSample(data)
>>> sample.lonlat
<GeoLocation lon:1.0, lat:3.0>
>>> root[u'sample'] = sample
>>> transaction.commit()

让我们检查我们的MongoDB中的项

>>> data = collection.find_one({'name': 'sample'})
>>> reNormalizer.pprint(data)
{u'__name__': u'sample',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': [1.0, 3.0],
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}

我们还可以将地理定位作为lonlat数据使用

>>> geo = m01.mongo.geo.GeoLocation({u'lat': 4, u'lon': 2})
>>> data = {'name': u'sample2', 'lonlat': geo}
>>> sample2 = testing.GeoSample(data)
>>> root[u'sample2'] = sample2
>>> transaction.commit()
>>> data = collection.find_one({'name': 'sample2'})
>>> reNormalizer.pprint(data)
{u'__name__': u'sample2',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'lat': 4.0, u'lon': 2.0},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample2'}

我们还可以将地理定位设置为lonlat值

>>> sample2 = root[u'sample2']
>>> geo = m01.mongo.geo.GeoLocation({'lon': 4, 'lat': 6})
>>> sample2.lonlat = geo
>>> transaction.commit()
>>> data = collection.find_one({'name': 'sample2'})
>>> reNormalizer.pprint(data)
{u'__name__': u'sample2',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoSample',
 u'_version': 2,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'lat': 6.0, u'lon': 4.0},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample2'}

拆解

>>> from m01.mongo import clearThreadLocalCache
>>> clearThreadLocalCache()

如您所见,我们的缓存项被删除

>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{}

地理点

地理点项可以存储地理定位,并作为子项在项中使用,提供经度和纬度以及类型。除此之外,地理点还提供_m_changed派发概念,能够在lon/lat发生变化时通知__parent__项。项还提供ILocation以支持安全查找。MongoGeoPointProperty字段属性负责应用__parent__和__name__并使用正确的类工厂。

地理点项支持按顺序存储经度和纬度,并保留它们。

条件

在开始测试之前,检查我们的线程局部缓存是否为空,或者是否有之前的测试遗留下来的垃圾。

>>> from m01.mongo.testing import pprint
>>> from m01.mongo import LOCAL
>>> from m01.mongo.testing import reNormalizer
>>> pprint(LOCAL.__dict__)
{}

设置

首先导入一些组件

>>> import datetime
>>> import transaction
>>> import m01.mongo
>>> import m01.mongo.base
>>> import m01.mongo.geo
>>> import m01.mongo.container
>>> from m01.mongo import interfaces
>>> from m01.mongo import testing

我们还需要一个应用程序根对象。让我们定义一个静态 MongoContainer 作为我们的应用程序数据库根项。

>>> class MongoRoot(m01.mongo.container.MongoContainer):
...     """Mongo application root"""
...
...     _id = m01.mongo.getObjectId(0)
...
...     def __init__(self):
...         pass
...
...     @property
...     def collection(self):
...         return testing.getRootItems()
...
...     @property
...     def cacheKey(self):
...         return 'root'
...
...     def load(self, data):
...         """Load data into the right mongo item."""
...         return testing.GeoPointSample(data)
...
...     def __repr__(self):
...         return '<%s %s>' % (self.__class__.__name__, self._id)

以下方法允许我们生成新的 MongoRoot 实例。这允许我们展示我们如何生成不同的根项,就像我们在服务器重启时做的那样。

>>> def getRoot():
...     return MongoRoot()

这是我们数据库根项

>>> root = getRoot()
>>> root
<MongoRoot 000000000000000000000000>
>>> root._id
ObjectId('000000000000000000000000')

索引

首先设置索引

>>> collection = testing.getRootItems()
>>> from pymongo import GEOSPHERE
>>> collection.create_index([('lonlat', GEOSPHERE)])
u'lonlat_2dsphere'

地理点样本

如你所见,我们可以在lon/lat值的列表中或lon/lat字典中初始化一个地理点

>>> data = {'name': u'sample', 'lonlat': {'lon': 1, 'lat': 3}}
>>> sample = testing.GeoPointSample(data)
>>> sample.lonlat
<GeoPoint lon:1.0, lat:3.0>
>>> data = {'name': u'sample', 'lonlat': [1, 3]}
>>> sample = testing.GeoPointSample(data)
>>> sample.lonlat
<GeoPoint lon:1.0, lat:3.0>
>>> root[u'sample'] = sample
>>> transaction.commit()

让我们检查我们的MongoDB中的项

>>> data = collection.find_one({'name': 'sample'})
>>> reNormalizer.pprint(data)
{u'__name__': u'sample',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [1.0, 3.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}

我们还可以将地理点作为lonlat数据使用

>>> geo = m01.mongo.geo.GeoPoint({u'lat': 4, u'lon': 2})
>>> data = {'name': u'sample2', 'lonlat': geo}
>>> sample2 = testing.GeoPointSample(data)
>>> root[u'sample2'] = sample2
>>> transaction.commit()
>>> data = collection.find_one({'name': 'sample2'})
>>> reNormalizer.pprint(data)
{u'__name__': u'sample2',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [2.0, 4.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample2'}

我们还可以将地理点设置为lonlat值

>>> sample2 = root[u'sample2']
>>> geo = m01.mongo.geo.GeoPoint({'lon': 4, 'lat': 6})
>>> sample2.lonlat = geo
>>> transaction.commit()
>>> data = collection.find_one({'name': 'sample2'})
>>> reNormalizer.pprint(data)
{u'__name__': u'sample2',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 2,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [4.0, 6.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample2'}

索引

>>> pprint(collection.index_information())
{'_id_': {'key': [('_id', 1)], 'ns': 'm01_mongo_testing.items', 'v': 1},
 'lonlat_2dsphere': {'2dsphereIndexVersion': 2,
                      'key': [('lonlat', '2dsphere')],
                      'ns': 'm01_mongo_testing.items',
                      'v': 1}}

搜索

让我们测试一些地理定位搜索查询,并确保我们的lon/lat顺序在mongodb往返过程中将适合并得到保留。

现在搜索地理定位

>>> def printFind(collection, query):
...     for data in collection.find(query):
...         reNormalizer.pprint(data)

使用地理空间索引,我们可以找到另一个点内的文档

>>> point = {"type": "Polygon",
...          "coordinates": [[[0,0], [0,6], [2,6], [2,0], [0,0]]]}
>>> query = {"lonlat": {"$within": {"$geometry": point}}}
>>> printFind(collection, query)
{u'__name__': u'sample',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [1.0, 3.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}

使用地理空间索引,我们可以找到另一个点附近的文档

>>> point = {'type': 'Point', 'coordinates': [0, 2]}
>>> printFind(collection, {'lonlat': {'$near': {'$geometry': point}}})
{u'__name__': u'sample',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [1.0, 3.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}
{u'__name__': u'sample2',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 2,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [4.0, 6.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample2'}

也可以查询给定矩形(由左下角和右上角坐标指定)内的所有项

>>> printFind(collection, {'lonlat': {'$within': {'$box': [[1,2], [2,3]]}}})
{u'__name__': u'sample',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [1.0, 3.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}

如你所见,如果我们使用错误的lon/lat顺序(lat/lon),我们不会得到任何值

>>> printFind(collection, {'lonlat': {'$within': {'$box': [[2,1], [3,2]]}}})

我们还可以搜索一个圆(由中心点和半径指定)

>>> printFind(collection, {'lonlat': {'$within': {'$center': [[0, 0], 2]}}})
>>> printFind(collection, {'lonlat': {'$within': {'$center': [[0, 0], 4]}}})
{u'__name__': u'sample',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [1.0, 3.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}
>>> printFind(collection, {'lonlat': {'$within': {'$center': [[0, 0], 10]}}})
{u'__name__': u'sample',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [1.0, 3.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}
{u'__name__': u'sample2',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 2,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [4.0, 6.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample2'}

也要检查lat/lon顺序是否重要

>>> printFind(collection, {'lonlat': {'$within': {'$center': [[1, 2], 1]}}})
{u'__name__': u'sample',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [1.0, 3.0], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}
>>> printFind(collection, {'lonlat': {'$within': {'$center': [[2, 1], 1]}}})

并且检查我们是否可以通过使用浮点数来存储真实的lon/lat值

>>> data = {'name': u'sample', 'lonlat': {'lon': 20.123, 'lat': 29.123}}
>>> sample3 = testing.GeoPointSample(data)
>>> root[u'sample3'] = sample3
>>> transaction.commit()
>>> printFind(collection, {'lonlat': {'$within': {'$center': [[25, 25], 4]}}})
>>> printFind(collection, {'lonlat': {'$within': {'$center': [[25, 25], 10]}}})
{u'__name__': u'sample3',
 u'_id': ObjectId('...'),
 u'_pid': ObjectId('...'),
 u'_type': u'GeoPointSample',
 u'_version': 1,
 u'created': datetime.datetime(..., tzinfo=UTC),
 u'lonlat': {u'coordinates': [20.123, 29.123], u'type': u'Point'},
 u'modified': datetime.datetime(..., tzinfo=UTC),
 u'name': u'sample'}

拆解

>>> from m01.mongo import clearThreadLocalCache
>>> clearThreadLocalCache()

如您所见,我们的缓存项被删除

>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{}

批量处理

MongoStorage和MongoContainer使用的MongoMappingBase基类可以返回批量数据或项以及批量信息。

注意;此测试在级别2运行,因为它使用了一个工作的MongoDB。这是必要的,因为我们喜欢在MongoDB中测试实际的排序和限制功能。

条件

在我们开始测试之前,检查我们的线程局部缓存是否为空或是否有来自以前测试的残留垃圾

>>> from m01.mongo.testing import pprint
>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{}

设置

首先导入一些组件

>>> import datetime
>>> import transaction
>>> from m01.mongo import testing

设置

现在我们可以在数据库中添加一个MongoStorage。让我们只使用一个简单的字典作为数据库根

>>> root = {}
>>> storage = testing.SampleStorage()
>>> root['storage'] = storage
>>> transaction.commit()

现在让我们添加1000个MongoItems

>>> storage = root['storage']
>>> for i in range(1000):
...     data = {u'title': u'Title %i' % i,
...             u'description': u'Description %i' % i,
...             u'number': i}
...     item = testing.SampleStorageItem(data)
...     __name__ = storage.add(item)
>>> transaction.commit()

在我们提交到 MongoDB 后,mongo 对象和我们的事务数据管理器引用应该在线程局部缓存中消失

>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{}

如你所见,我们的集合包含1000个项

>>> storage = root['storage']
>>> len(storage)
1000

批量处理

注意,此方法不返回项,它只返回MongoDB数据。这就是你应该使用的方法。如果你需要一个真实MongoItem的列表,这会变得复杂,因为我们可能会在我们的本地缓存中删除MongoDB不知道的标记项。

让我们获取批处理信息

>>> storage.getBatchData()
(<...Cursor object at ...>, 1, 40, 1000)

如你所见,我们有一个包含Mongo数据、起始索引、项总数和页面计数的游标。注意,第一页从1(一)开始,而不是0。让我们展示另一个具有不同值的示例

>>> storage.getBatchData(page=5, size=10)
(<...Cursor object at ...>, 5, 100, 1000)

如你所见,我们可以迭代我们的游标

>>> cursor, page, total, pages = storage.getBatchData(page=1, size=3)
>>> pprint(tuple(cursor))
({'__name__': '...',
  '_id': ObjectId('...'),
  '_pid': None,
  '_type': 'SampleStorageItem',
  '_version': 1,
  'comments': [],
  'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  'description': 'Description ...',
  'item': None,
  'modified': datetime.datetime(..., tzinfo=UTC),
  'number': ...,
  'numbers': [],
  'title': 'Title ...'},
 {'__name__': '...',
  '_id': ObjectId('...'),
  '_pid': None,
  '_type': 'SampleStorageItem',
  '_version': 1,
  'comments': [],
  'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  'description': 'Description ...',
  'item': None,
  'modified': datetime.datetime(..., tzinfo=UTC),
  'number': ...,
  'numbers': [],
  'title': 'Title ...'},
 {'__name__': '...',
  '_id': ObjectId('...'),
  '_pid': None,
  '_type': 'SampleStorageItem',
  '_version': 1,
  'comments': [],
  'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  'description': 'Description ...',
  'item': None,
  'modified': datetime.datetime(..., tzinfo=UTC),
  'number': ...,
  'numbers': [],
  'title': 'Title ...'})

如你所见,游标计算项的总数

>>> cursor.count()
1000

但我们可以通过使用True作为参数来强制根据limit和skip参数来计数结果

>>> cursor.count(True)
3

如你所见,批处理或任何其他对象查找都会在我们的线程局部缓存中留下项。我们可以使用我们的线程局部缓存清理事件处理器,通常注册为EndRequestEvent订阅者

>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{u'm01_mongo_testing.test...': {'added': {}, 'removed': {}}}

让我们使用我们的订阅者

>>> from m01.mongo import clearThreadLocalCache
>>> clearThreadLocalCache()

如您所见,我们的缓存项被删除

>>> from m01.mongo import LOCAL
>>> pprint(LOCAL.__dict__)
{}

顺序

批处理的一个重要部分是排序。如你所见,我们可以限制批次大小,并从序列中获取数据的一个片段。在我们将数据切片到批次之前,对数据进行排序非常重要。让我们基于我们的可排序数字值和最低值优先的排序顺序来测试它是否有效。从页面=0开始

>>> cursor, page, pages, total = storage.getBatchData(page=1, size=3,
...     sortName='number', sortOrder=1)
>>> cursor
<pymongo.cursor.Cursor object at ...>
>>> page
1
>>> pages
334
>>> total
1000

当排序正确时,第一个项的数字值应该是0(零)

>>> pprint(tuple(cursor))
({u'__name__': u'...',
  u'_id': ObjectId('...'),
  '_pid': None,
  u'_type': u'SampleStorageItem',
  u'_version': 1,
  u'comments': [],
  u'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  u'description': u'Description 0',
  'item': None,
  u'modified': datetime.datetime(..., tzinfo=UTC),
  u'number': 0,
  u'numbers': [],
  u'title': u'Title 0'},
 {u'__name__': u'...',
  u'_id': ObjectId('...'),
  '_pid': None,
  u'_type': u'SampleStorageItem',
  u'_version': 1,
  u'comments': [],
  u'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  u'description': u'Description 1',
  'item': None,
  u'modified': datetime.datetime(..., tzinfo=UTC),
  u'number': 1,
  u'numbers': [],
  u'title': u'Title 1'},
 {u'__name__': u'...',
  u'_id': ObjectId('...'),
  '_pid': None,
  u'_type': u'SampleStorageItem',
  u'_version': 1,
  u'comments': [],
  u'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  u'description': u'Description 2',
  'item': None,
  u'modified': datetime.datetime(..., tzinfo=UTC),
  u'number': 2,
  u'numbers': [],
  u'title': u'Title 2'})

第二页(页面=1)应该以数字==3开始

>>> cursor, page, pages, total = storage.getBatchData(page=2, size=3,
...     sortName='number', sortOrder=1)
>>> pprint(tuple(cursor))
({u'__name__': u'...',
  u'_id': ObjectId('...'),
  '_pid': None,
  u'_type': u'SampleStorageItem',
  u'_version': 1,
  u'comments': [],
  u'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  u'description': u'Description 3',
  'item': None,
  u'modified': datetime.datetime(..., tzinfo=UTC),
  u'number': 3,
  u'numbers': [],
  u'title': u'Title 3'},
 {u'__name__': u'...',
  u'_id': ObjectId('...'),
  '_pid': None,
  u'_type': u'SampleStorageItem',
  u'_version': 1,
  u'comments': [],
  u'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  u'description': u'Description 4',
  'item': None,
  u'modified': datetime.datetime(..., tzinfo=UTC),
  u'number': 4,
  u'numbers': [],
  u'title': u'Title 4'},
 {u'__name__': u'...',
  u'_id': ObjectId('...'),
  '_pid': None,
  u'_type': u'SampleStorageItem',
  u'_version': 1,
  u'comments': [],
  u'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  u'description': u'Description 5',
  'item': None,
  u'modified': datetime.datetime(..., tzinfo=UTC),
  u'number': 5,
  u'numbers': [],
  u'title': u'Title 5'})

如你所见,你的页面大小是334。让我们展示这个批次切片。这个批次中的项应该有数字==999,但请注意

>>> pages
334
>>> cursor, page, total, pages = storage.getBatchData(page=334, size=3,
...     sortName='number', sortOrder=1)
>>> pprint(tuple(cursor))
({u'__name__': u'...',
  u'_id': ObjectId('...'),
  '_pid': None,
  u'_type': u'SampleStorageItem',
  u'_version': 1,
  u'comments': [],
  u'created': datetime.datetime(..., tzinfo=UTC),
  'date': None,
  u'description': u'Description 999',
  'item': None,
  u'modified': datetime.datetime(..., tzinfo=UTC),
  u'number': 999,
  u'numbers': [],
  u'title': u'Title 999'},)

拆除

调用事务提交,这将清理我们的本地缓存

>>> transaction.commit()

再次,清除线程局部缓存

>>> clearThreadLocalCache()

在我们离开这个测试之前检查我们的线程局部缓存

>>> pprint(LOCAL.__dict__)
{}

测试

让我们测试一些测试方法。

>>> import re
>>> import datetime
>>> import bson.tz_util
>>> import m01.mongo
>>> import m01.mongo.testing
>>> from m01.mongo.testing import pprint

RENormalizer

RENORMALIZER能够规范化文本并产生可比较的输出。你可以通过输入、输出表达式列表来设置RENORMALIZER。这对于导出包含日期或其他非简单可复现数据的mongodb数据非常有用。在单元测试比较输出之前,这样的导出结果可以规范化。还可以参见zope.testing.renormalizing,它使用相同的模式作为doctest检查器。

>>> normalizer = m01.mongo.testing.RENormalizer([
...    (re.compile('[0-9]*[.][0-9]* seconds'), '... seconds'),
...    (re.compile('at 0x[0-9a-f]+'), 'at ...'),
...    ])
>>> text = """
... <object object at 0xb7f14438>
... completed in 1.234 seconds.
... ...
... <object object at 0xb7f14450>
... completed in 1.234 seconds.
... """
>>> print normalizer(text)
<BLANKLINE>
<object object at ...>
completed in ... seconds.
...
<object object at ...>
completed in ... seconds.
<BLANKLINE>

现在让我们测试一些与mongodb相关的功能

>>> from bson.dbref import DBRef
>>> from bson.min_key import MinKey
>>> from bson.max_key import MaxKey
>>> from bson.objectid import ObjectId
>>> from bson.timestamp import Timestamp
>>> oid = m01.mongo.getObjectId(42)
>>> oid
ObjectId('0000002a0000000000000000')
>>> data = {'oid': oid,
...         'dbref': DBRef("foo", 5, "db"),
...         'date': datetime.datetime(2011, 5, 7, 1, 12),
...         'utc': datetime.datetime(2011, 5, 7, 1, 12, tzinfo=bson.tz_util.utc),
...         'min': MinKey(),
...         'max': MaxKey(),
...         'timestamp': Timestamp(4, 13),
...         're': re.compile("a*b", re.IGNORECASE),
...         'string': 'string',
...         'unicode': u'unicode',
...         'int': 42}

现在让我们美化打印数据

>>> pprint(data)
{'date': datetime.datetime(...),
 'dbref': DBRef('foo', 5, 'db'),
 'int': 42,
 'max': MaxKey(),
 'min': MinKey(),
 'oid': ObjectId('...'),
 're': <_sre.SRE_Pattern object at ...>,
 'string': 'string',
 'timestamp': Timestamp('...'),
 'unicode': 'unicode',
 'utc': datetime.datetime(..., tzinfo=UTC)}

reNormalizer

如你所见,我们预定义的reNormalizer将使用我们提供的模式转换值

>>> import m01.mongo.testing
>>> res = m01.mongo.testing.reNormalizer(data)
>>> print res
{'date': datetime.datetime(...),
 'dbref': DBRef('foo', 5, 'db'),
 'int': 42,
 'max': MaxKey(),
 'min': MinKey(),
 'oid': ObjectId('...'),
 're': <_sre.SRE_Pattern object at ...>,
 'string': 'string',
 'timestamp': Timestamp('...'),
 'unicode': u'unicode',
 'utc': datetime.datetime(..., tzinfo=UTC)}

pprint

>>> m01.mongo.testing.reNormalizer.pprint(data)
{'date': datetime.datetime(...),
 'dbref': DBRef('foo', 5, 'db'),
 'int': 42,
 'max': MaxKey(),
 'min': MinKey(),
 'oid': ObjectId('...'),
 're': <_sre.SRE_Pattern object at ...>,
 'string': 'string',
 'timestamp': Timestamp('...'),
 'unicode': u'unicode',
 'utc': datetime.datetime(..., tzinfo=UTC)}

UTC

PyMongo库提供了一个自定义的UTC实现,包括pickle支持,用于deepcopy。让我们测试这个实现是否有效,并将我们的自定义时区替换为bson.tz_info.utc

>>> dt = data['utc']
>>> dt
datetime.datetime(2011, 5, 7, 1, 12, tzinfo=UTC)
>>> import copy
>>> copy.deepcopy(dt)
datetime.datetime(2011, 5, 7, 1, 12, tzinfo=UTC)

加快你的实现

由于并非每个策略都适用于每个应用程序,并且我们无法在此包中实现所有概念,我们将在此列出一些改进。

值和项

MongoContainers和MongoStorage实现将在values和items方法中加载所有数据,即使我们已经在我们的线程局部缓存中缓存了它们。这里有一个优化方法,如果需要加载大量数据,可以加以使用。

MongoMappingBase.values的原实现如下

def values(self):
    # join transaction handling
    self.ensureTransaction()
    for data in self.doFind(self.collection):
        __name__ = data['__name__']
        if __name__ in self._cache_removed:
            # skip removed items
            continue
        obj = self._cache_loaded.get(__name__)
        if obj is None:
            try:
                # load, locate and cache if not cached
                obj = self.doLoad(data)
            except (KeyError, TypeError):
                continue
        yield obj
    # also return items not stored in MongoDB yet
    for k, v in self._cache_added.items():
        yield v

如果你想防止加载所有数据,你可能只需加载键,并为尚未缓存的项查找数据。这将减少网络流量,可能看起来像

def values(self):
    # join transaction handling
    self.ensureTransaction()
    # only get __name__ and _id
    for data in self.doFind(self.collection, {}, ['__name__', '_id']):
        __name__ = data['__name__']
        if __name__ in self._cache_removed:
            # skip removed items
            continue
        obj = self._cache_loaded.get(__name__)
        if obj is None:
            try:
                # now we can load data from mongo
                d = self.doFindOne(self.collection, data)
                # load, locate and cache if not cached
                obj = self.doLoad(d)
            except (KeyError, TypeError):
                continue
        yield obj
    # also return items not stored in MongoDB yet
    for k, v in self._cache_added.items():
        yield v

注意:相同的概念可用于items方法。

注意:我不建议在任何时候调用keys、values或items来处理大型集合。看看我们实现的批处理概念。getBatchData方法可能是处理大量数据时需要使用的方法。

高级转换器

以下类展示了高级实现,能够转换嵌套数据结构。

通常,转换器可以转换属性值。如果属性值是一个包含另一个项目列表的列表,那么您需要使用另一个能够转换这种嵌套结构的转换器。但通常,这是第一级项目的责任来转换其值。这就是为什么我们没有默认实现这个概念的原因。

记住,默认转换器定义看起来像这样

def itemConverter(value):
    _type = value.get('_type')
    if _type == 'Car':
        return Car
    if _type == 'House':
        return House
    else:
        return value

该类定义了一些类似的东西

converters = {'myItems': itemConverter}

我们的高级转换器示例可以转换嵌套数据结构,看起来像这样

def toCar(value):
    return Car(value)

converters = {'myItems': {'House': toHouse, 'Car': toCar}}

class AdvancedConverter(object):

    converters = {} # attr-name/converter or {_type:converter}
    def convert(self, key, value):
        """This convert method knows how to handle nested converters."""
        converter = self.converters.get(key)
        if converter is not None:
            if isinstance(converter, dict):
                if isinstance(value, (list, tuple)):
                    res = []
                    for o in value:
                        if isinstance(o, dict):
                            _type = o.get('_type')
                            if _type is not None:
                                converter = converter.get(_type)
                                value = converter(o)
                        res.append(value)
                    value = res
                elif isinstance(value, dict):
                    _type = o.get('_type')
                    if _type is not None:
                        converter = converter.get(_type)
                        value = converter(value)
                else:
                    value = converter(value)
            else:
                if isinstance(value, (list, tuple)):
                    # convert list values
                    value = [converter(d) for d in value]
                else:
                    # convert simple values
                    value = converter(value)
        return value

我相信如果您理解了我们的实现,您会发现很多改进和编写您自己的特殊方法的空间,这些方法可以针对您的用例做正确的事情。

变更

3.3.5 (2024-01-09)

  • bugfix:调整MONGODB_TLS_CERTIFICATE_SELECTOR设置。使用MONGODB_TLS_ALLOW_INVALID_HOSTNAME选项是错误的。

  • 添加了修复datetime now方法的钩子。使用钩子设置创建和修改日期。这允许我们在复杂的测试设置中使用日期钩子。

3.3.4 (2021-03-26)

  • bugfix:从IMongoSubObject接口中删除_version和ICreatedModified。_version、modified和create不一定总是适用于子对象,因为这些信息可以在父对象中找到。您可以在自己的实现中自由支持这些附加信息。

3.3.3 (2021-03-23)

  • feature:实现了MongoSubObject对象和MongoSubObjectProperty。MongoSubObjectProperty提供了一个转换器和工厂,可以用来将对象作为属性应用。这允许在属性名中遍历对象。与将数据存储在自己的集合中的MongoObject相比,MongoSubobject实现将数据存储在父对象中。

3.3.2 (2021-01-14)

  • 为pymongo客户端设置添加TLS选项

  • bugfix:修复MongoItemsData中的order方法与set的比较。现有的实现使用pop with values而不是索引来验证新的顺序名称。

  • 添加了针对bugfix的测试

3.3.1 (2020-04-22)

  • bugfix:注册MongoListData类,允许接口IMongoListData。这允许像内置类型一样访问内部实现。注意:使用此实现的对象属性仍然是受保护的。我们只是让我们的实例表现得像内置的简单Python类型。

3.3.0 (2018-02-04)

  • 使用新的p01.env包设置pymongo客户端环境

3.2.3 (2018-02-04)

  • bugfix:从mongo客户端测试设置中删除FakeMongoConnectionPool

  • 将MONGODB_CONNECT设置为默认False,因为客户端设置对于测试设置来说太耗时了。如果您需要在应用程序启动时连接,请将MONGODB_CONNECT添加到您的os环境中。

3.2.2 (2018-01-29)

  • bugfix:修复超时毫秒数和MONGODB_REVOCATION_LIST attr的使用

3.2.1 (2018-01-29)

  • bugfix:将MONGODB_SERVER_SELECTION_TIMEOUT乘以1000,因为它用作毫秒

3.2.0 (2018-01-29)

  • feature:实现了基于环境变量和默认settings.py文件的pymongo客户端设置

3.1.0 (2017-01-22)

  • bugfix:确保我们在Python对象中给定值为None时覆盖现有的mongodb值。旧版本没有使用None覆盖现有值。新实现将使用默认模式值作为mongodb值,即使默认值为None。注意,这将破坏现有的测试输出。

  • bugfix:修复性能测试设置,条件包含ZODB进行性能测试。通过setup.py中的extras_require支持。

3.0.0 (2015-11-11)

  • 使用3.0.0作为包版本并反映pymongo > 3.0.0兼容性。

  • feature:更改内部doFind、doInsert和doRemove方法,删除旧方法参数,如safe等。

  • feature:反映pymongo > 3.0.0中的更改。用close方法代替disconnect,如MongoClient所做的那样。

  • 从代码中删除MongoConnectionPool,用MongoClient替换它们。由于pymongo是线程安全的,因此不需要线程安全的连接池。同样,在测试代码中用MongoClient替换MongoConnection。

  • 从m01.mongofake切换到m01.fake,包括对pymongo >= 3.0.0的支持

  • 从映射基类中删除write_concern选项。MongoClient应该定义正确的写关注。

1.0.0 (2015-03-17)

  • 优化对象设置中的AttributeError处理。额外捕获ValueError和zope.interface.Invalid,并带有详细属性和值信息的AttributeError。

0.11.1 (2014-04-10)

  • 特性:将Mongo客户端的max_pool_size值从10MB更改为100MB,以反映pymongo >= 2.6的变化。

0.11.0 (2013-1-23)

  • 实现用于2dsphere地理空间索引的GeoPoint。还提供了一个MongoGeoPointProperty,可以创建此类GeoPoint项。

0.10.2 (2013-01-04)

  • 在MongoObject中支持_m_insert_write_concern、_m_update_write_concern和_m_remove_write_concern。

0.10.1 (2012-12-19)

  • 特性:实现了支持时区信息属性(tzinfo=UTC)的MongoDatetime模式字段。

0.10.0 (2012-12-16)

  • 从Connection切换到MongoClient(自pymongo 2.4以来推荐)。用写关注选项替换了safe。现在默认情况下pymongo将使用安全写入。

  • 在MongoConnectionPool中将MongoClient用作工厂。我们没有重命名MongoConnectionPool类,我们将保持原样。我们也没有重命名IMongoConnectionPool接口。

  • 用_m_insert_write_concern、_m_update_write_concern和_m_remove_write_concern替换了_m_safe_insert、_m_safe_update和_m_safe_remove。这个新映射基类选项是一个空字典,可以被新的写关注设置替换。默认的空字典将强制使用连接中定义的写关注。

0.9.0 (2012-12-10)

  • 使用m01.mongofake作为伪造的mongodb、集合及其相关内容。

0.8.0 (2012-11-18)

  • 错误修正:为导出数据添加缺少的安全声明。

  • 切换到bson导入。

  • 根据pymongo 2.3反映测试输出的变化。

  • 删除p01.i18n包依赖。

  • 改进,防止将具有相同值的项标记为已更改。

  • 改进排序,支持将键或列表作为sortName,如果提供sortName,允许跳过sortOrder。

  • 添加了MANIFEST.in文件。

0.7.0 (2012-05-22)

  • 错误修正:FakeCollection.remove:使用find查找文档。

  • 使用SON保留查询过滤器和导出方法的顺序。

  • 实现m01.mongo.dictify,它可以递归地将所有的bson.son.SON替换为普通的字典实例。

0.6.2 (2012-03-12)

  • 错误修正:遗漏了一个方法。

0.6.1 (2012-03-12)

  • 错误修正:在FakeMongoConnection的__call__方法中返回self。这使得实例的行为类似于原始pymongo Connection类的__init__方法。

  • 特性:为FakeMongoConnection.find()添加了sort参数。

0.6.0 (2012-01-17)

  • 错误修正:在查询过程中,如果文档中缺少特定的键,则始终忽略该文档。

  • 错误修正:正确地在UTC中生成对象ID。它依赖于GMT+1(即Roger的时区)。

  • 错误修正:允许将None用作MongoDateProperty的值。

  • 错误修正:如果给定,在MongoSubItem的__init__方法中设置__parent__。

  • 实现_m_initialized作为标记,以找出何时需要跟踪更改的属性。

  • 在MongoListData和MongoItemsData中实现了clear方法,允许一次性删除序列项,而无需从序列中逐个弹出项。

  • 改进MongoObject实现,实现了_field,该字段存储MongoObject存储的父字段名。还调整了MongoObjectProperty,并通过应用先前存储的__name__作为_field(如果未提供)支持向后兼容。这个新的_field和__name__分离使我们能够使用显式名称,例如_id或自定义名称,我们可以通过遍历器或其他容器实现(如)遍历到MongoObject。

  • 在FakeCollection中实现了__getattr__。这允许像pymongo一样获取子集合,它是gridfs概念的一部分。

0.5.5 (2011-10-14)

  • 实现使用点符号的过滤。

0.5.4 (2011-09-27)

  • 修复:真实的MongoDB接受元组作为find的fields参数。

0.5.3 (2011-09-20)

  • 修复最小过滤表达式(Albertas)。

0.5.2 (2011-09-19)

  • 添加最小过滤表达式(Albertas)。

  • 将创建和修改移动到自己的接口ICreatedModified中。

  • 实现简单的通用初始地理定位支持。

0.5.1 (2011-09-09)

  • 修复性能测试。

  • 添加database_names和collection_names。

0.5.0 (2011-08-19)

  • 初始发布。

项目详情


下载文件

下载适合您平台文件的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

m01.mongo-3.3.5.tar.gz (104.6 kB 查看哈希值)

上传时间

支持者

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误日志StatusPageStatusPage状态页面