Kusto Ingestion Tools (Kit)
项目描述
Kusto Ingestion Tools (Kit)
azure-kusto-ingestion-tools 一个用于帮助数据导入的简单工具包,可在以下位置获取
目的
使数据导入更简单(至少对于常见情况)。通过Azure门户创建ADX(Kusto)集群后,我们希望探索/可视化一些数据。当我们评估数据存储/工具时,通常只想进行POC能力测试并快速进行。
这就是创建此项目的原因。它包含以下功能来支持
- 数据源 模式 推断(csv / kusto/ ...)
- 常见 导入 场景(从文件/整个文件夹/ ...)
- 其他有用工具(kql生成器,...)
目录
概念
给定一个数据源,通常工作流程将包括
- 描述数据源。
- 准备目标数据存储(在我们的情况下,Kusto)
- 映射源到目标
- 加载数据
- 可选:自动化/转移到生产环境
安装
最低要求
- Python 3.7+
- 查看setup.py以获取依赖项
Pip
要使用Python包索引(PyPI)安装,请键入
pip install azure-kusto-ingestion-tools
这将公开新的命令行命令: kit --help
使用方法
基本
kit ingest -d /path/to/data/imdb -h mycluster.westus
以下命令将尝试使用类型推断导入 /path/to/data/imdb
(非递归)中的所有文件。
<!>注意<!>:如果没有提供任何其他参数,此命令非常 有偏见,并将假定以下内容
选项
身份验证
每个需要通过 kusto 验证的命令都需要验证参数。
默认情况下,将尝试从Azure CLI 获取令牌。
其他选项包括:
应用
套件 [命令] -app {app_id}:{app_secret}
用户
套件 [命令] -user {user_id}:{password}
命名
- 数据库 将设置为数据所在的目录,因此
/path/to/data/imdb
将寻找,并在需要时创建名为imdb
的数据库。
如果需要更多控制,请尝试--database
- 表 是实际文件名,因此
/path/to/data/imdb/aka_name.csv
将寻找,并在需要时创建名为aka_name
的表。
这可以通过确保数据被分割到文件夹中来实现,其中任何文件夹都是一个表。
这种递归模式假定所有文件具有相同的表结构。
文件
数据库模式文件
这是一种描述数据库的简单方法。
这可以使用纯 JSON 格式来描述数据库模式,从而轻松复制整个数据库模式。
{
"name": "imdb",
"tables": [{
"name": "aka_name",
"columns": [{
"dtype": "int",
"name": "id",
"index": 0
}, {
"dtype": "int",
"name": "person_id",
"index": 1
}, {
"dtype": "string",
"name": "name",
"index": 2
}, {
"dtype": "string",
"name": "imdb_index",
"index": 3
}, {
"dtype": "string",
"name": "name_pcode_cf",
"index": 4
}, {
"dtype": "string",
"name": "name_pcode_nf",
"index": 5
}, {
"dtype": "string",
"name": "surname_pcode",
"index": 6
}, {
"dtype": "string",
"name": "md5sum",
"index": 7
}]
},
...
]
}
从现有集群
kit schema create -h 'https://mycluster.kusto.windows.net' -db imdb > imdb_schema.json
从 SQL 文件
kit schema create -sql imdb.sql > schema.json
从包含原始数据的文件夹
kit schema create -d path/to/dir > schema.json
更多内容将陆续推出...
清单文件
一个用于描述可以稍后运行的摄取详细信息的文件
{
"databases": [ "same as schema.json" ],
"mappings": [{
"name": "aka_name_from_csv",
"columns": [{
"source": {
"index": 0,
"data_type": "str"
},
"target": {
"index": 0,
"data_type": "str"
}
}]
}],
"operations": [{
"database": "imdb",
"sources": [{
"files": ["1.csv", "...", "99.csv"],
"mapping": "aka_name_from_csv"
}],
"target": [ "aka_name" ]
}]
}
示例
示例 1:摄取 IMDB 数据集,CSV 文件(用于连接顺序基准测试)
一个有用的场景是将整个现有数据集加载到 Kusto 中。
以论文 How good are query optimizers really? 中使用的 连接顺序基准测试 为例。
1. 将文件复制到本地目录
从 Azure 存储下载
wget https://imdb2013dataset.blob.core.windows.net/data/imdb.tgz --no-check-certificate
或
curl https://imdb2013dataset.blob.core.windows.net/data/imdb.tgz --output imdb.tgz
原始文件可用,但格式不正确(不符合https://tools.ietf.org/html/rfc4180)。
可以使用像xsv这样的工具来修复它们,
但我们将错误处理留到另一个部分。
2. 解压文件
tar -xvzf imdb.tgz
3. 下载 SQL 创建命令
wget https://raw.githubusercontent.com/gregrahn/join-order-benchmark/master/schema.sql -O imdb.sql --no-check-certificate
或
curl https://raw.githubusercontent.com/gregrahn/join-order-benchmark/master/schema.sql --output imdb.sql
4. 从 SQL 语句创建模式
kit schema create -sql schema.sql > imdb_schema.json
5. 在集群上应用模式
假设我们已经有了一个集群,并且我们已经使用 az cli 登录,我们只需将模式应用到我们选择的数据库上即可
kit schema apply -f imdb_schema.json -h mycluster.westus -db imdb
6. 从本地文件摄取数据
kit ingest -d . --pattern "*.csv" -h mycluster.westus -db imdb
7. 查询
使用 Azure 门户,您现在可以轻松登录并查询您的数据。
您始终可以通过比较源行数和目标列数来确保数据已加载
xsv count aka_name.csv
- 应显示 901343 行
或
wc -l aka_name.csv
- 应显示 901343 行
从 kusto 查询应显示相同
kit count --table aka_name -h mycluster.westus -db imdb
- 应显示 901343
并查看数据: kit peek --table aka_name -n 10 -h mycluster.westus -db imdb
示例 2:摄取 Kaggle ML 数据集,CSV 和 JSON
Kaggale 有大量有趣的用于 ML/AI 的数据集。
让我们尝试摄取一些
https://www.kaggle.com/mlg-ulb/creditcardfraud/ https://www.kaggle.com/START-UMD/gtd/
上传到我们的Azure存储空间以方便访问
wget https://imdb2013dataset.blob.core.windows.net/data/creditcard.csv.gz --no-check-certificate
wget https://imdb2013dataset.blob.core.windows.net/data/globalterrorism.csv.gz --no-check-certificate
wget https://imdb2013dataset.blob.core.windows.net/data/arxivData.csv.gz --no-check-certificate
或
curl https://imdb2013dataset.blob.core.windows.net/data/creditcard.csv.gz --output creditcard.csv.gz
curl https://imdb2013dataset.blob.core.windows.net/data/globalterrorism.csv.gz --output globalterrorism.csv.gz
curl https://imdb2013dataset.blob.core.windows.net/data/arxivData.json.gz --output arxivData.json.gz
下载并解压后,思路相同,但这次文件包含标题,因此可以推断模式
kit ingest -d . -h mycluster.westus -db ml --headers
示例 3:复杂的嵌套JSON映射
让我们看看一个更高级的用例
wget https://imdb2013dataset.blob.core.windows.net/data/demo.json --no-check-certificate
或
curl https://imdb2013dataset.blob.core.windows.net/data/demo.json --output demo.json
假设我们的数据是json lines文件,其中每个条目看起来像
{"header":{"time":"24-Aug-18 09:42:15", "id":"0944f542-a637-411b-94dd-8874992d6ebc", "api_version":"v2"}, "payload":{"data":"NEEUGQSPIPKDPQPIVFE", "user":"owild@fabrikam.com"}}
看起来我们有一个嵌套对象。因为我们不确定会发生什么,所以让我们先进行dry run。让我们尝试使用--dry
和--object-depth 2
来运行一个ingestion。
kit ingest -f demo.json --object-depth 2 -h mycluster.westus -db ml --dry > manifest.json
这会产生以下manifest.json
,其中包含要执行的操作。
{
"databases": [
{
"name": "ml",
"tables": []
}
],
"mappings": [
{
"name": "demo_from_json",
"columns": [
{
"source": {
"dtype": "string",
"name": "header.time",
"index": null
},
"target": {
"dtype": "string",
"name": "header.time",
"index": null
}
},
{
"source": {
"dtype": "string",
"name": "header.id",
"index": null
},
"target": {
"dtype": "string",
"name": "header.id",
"index": null
}
},
{
"source": {
"dtype": "string",
"name": "header.api_version",
"index": null
},
"target": {
"dtype": "string",
"name": "header.api_version",
"index": null
}
},
{
"source": {
"dtype": "string",
"name": "payload.data",
"index": null
},
"target": {
"dtype": "string",
"name": "payload.data",
"index": null
}
},
{
"source": {
"dtype": "string",
"name": "payload.user",
"index": null
},
"target": {
"dtype": "string",
"name": "payload.user",
"index": null
}
}
]
}
],
"operations": [
{
"database": "ml",
"sources": [
{
"files": [
"C:\\Users\\dadubovs\\temp\\ml_datasets\\demo.json"
],
"mapping": "demo_from_json",
"options": {},
"data_format": "json"
}
],
"target": "demo"
}
]
}
现在,假设我们不需要id
字段,我们可以编辑映射并保存它。
如果我们仍然不确定,并希望更好地理解将要创建的命令,我们可以检查kql
kit kql -m manifest.json
它应该输出类似以下内容
// Table Creation Commands:
.create table demo (['header.time']:string,['header.api_version']:string,['payload.data']:string,['payload.user']:string)
// Ingestion Mapping Creation Commands:
.create table demo ingestion json mapping "demo_from_json" '[{"column":"header.time","path":"$.header.time","datatype":"string"},{"column":"header.api_version","path":"$.header.api_version","datatype":"string"},{"column":"payload.data","path":"$.payload.data","datatype":"string"},{"column":"payload.user","path":"$.payload.user","datatype":"string"}]'
一旦我们准备好了,我们可以根据manifest继续我们的ingestion
kit ingest -m manifest.json -h mycluster.westus
项目详情
azure-kusto-ingestion-tools-0.3.1.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | fee1bb71cf9f9a9421ea16fd216b78c37889b644100166d7bfa1b20aa982afc8 |
|
MD5 | fa99f27048706232cd482c951900e009 |
|
BLAKE2b-256 | b072f2f6844c77de934ba06cbabe6df4022f2653f224f5540238f1c7674a4ce4 |