跳转到主要内容

Python接口用于访问Salesforce.com批量API。

项目描述

travis-badge

Salesforce Bulk

访问异步Salesforce.com批量API的Python客户端库。

安装

pip install salesforce-bulk

身份验证

要访问批量API,您需要将用户认证到Salesforce中。最简单的方法是只需提供用户名密码安全令牌。此库将使用simple-salesforce包来处理基于密码的认证。

from salesforce_bulk import SalesforceBulk

bulk = SalesforceBulk(username=username, password=password, security_token=security_token)
...

或者,如果您有会话ID和实例URL的访问权限,可以直接使用这些信息

from urlparse import urlparse
from salesforce_bulk import SalesforceBulk

bulk = SalesforceBulk(sessionId=sessionId, host=urlparse(instance_url).hostname)
...

操作

驱动批量API的基本顺序是

  1. 创建一个新的作业

  2. 将一个或多个批次添加到作业中

  3. 关闭作业

  4. 等待每个批次完成

批量查询

bulk.create_query_job(object_name, contentType='JSON')

使用API v39.0或更高版本,您还可以使用查询所有操作

bulk.create_queryall_job(object_name, contentType='JSON')

示例

import json
from salesforce_bulk.util import IteratorBytesIO

job = bulk.create_query_job("Contact", contentType='JSON')
batch = bulk.query(job, "select Id,LastName from Contact")
bulk.close_job(job)
while not bulk.is_batch_done(batch):
    sleep(10)

for result in bulk.get_all_results_for_query_batch(batch):
    result = json.load(IteratorBytesIO(result))
    for row in result:
        print row # dictionary rows

相同的示例,但针对CSV

import unicodecsv

job = bulk.create_query_job("Contact", contentType='CSV')
batch = bulk.query(job, "select Id,LastName from Contact")
bulk.close_job(job)
while not bulk.is_batch_done(batch):
    sleep(10)

for result in bulk.get_all_results_for_query_batch(batch):
    reader = unicodecsv.DictReader(result, encoding='utf-8')
    for row in reader:
        print(row) # dictionary rows

请注意,虽然CSV由于历史原因默认使用,但JSON应更受青睐,因为CSV在处理NULL与空字符串时存在一些缺点。

主键块头

如果您正在查询大量记录,您可能希望启用主键分块

bulk.create_query_job(object_name, contentType='CSV', pk_chunking=True)

这将使用默认的块大小设置。您可以通过提供每个块中的记录数来使用不同的块大小

bulk.create_query_job(object_name, contentType='CSV', pk_chunking=100000)

此外,如果您想要做更复杂的事情,您可以提供一个头值

bulk.create_query_job(object_name, contentType='CSV', pk_chunking='chunkSize=50000; startRow=00130000000xEftMGH')

批量插入、更新、删除

所有批量上传操作的工作方式相同。您在创建作业时设置操作。然后提交一个或多个指定要插入/更新/删除的记录及其列的文档。在删除时,您应只提交每个记录的Id。

为了提高效率,您应使用post_batch方法来发布每批数据。(注意,一个批次最多可以有10,000条记录,大小为1GB。)您将生成器或迭代器传递到该函数中,它将通过POST将数据流式传输到Salesforce。如果您需要发送CSV格式数据的帮助,可以使用salesforce_bulk.CsvDictsAdapter类。它接受返回字典的迭代器,并返回产生CSV数据的迭代器。

完整示例

from salesforce_bulk import CsvDictsAdapter

job = bulk.create_insert_job("Account", contentType='CSV')
accounts = [dict(Name="Account%d" % idx) for idx in xrange(5)]
csv_iter = CsvDictsAdapter(iter(accounts))
batch = bulk.post_batch(job, csv_iter)
bulk.wait_for_batch(job, batch)
bulk.close_job(job)
print("Done. Accounts uploaded.")

并发模式

在创建作业时,通过传递concurrency='Serial'concurrency='Parallel'来设置作业的并发模式。

项目详情


下载文件

下载适合您平台文件的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

salesforce-bulk-2.2.0.tar.gz (12.3 kB 查看哈希值)

上传时间

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面