跳转到主要内容

Kusto Ingestion Tools (Kit)

项目描述

Kusto Ingestion Tools (Kit)

azure-kusto-ingestion-tools 一个用于帮助数据导入的简单工具包,可在以下位置获取
PyPI version Downloads

目的

使数据导入更简单(至少对于常见情况)。通过Azure门户创建ADX(Kusto)集群后,我们希望探索/可视化一些数据。当我们评估数据存储/工具时,通常只想进行POC能力测试并快速进行。

这就是创建此项目的原因。它包含以下功能来支持

  1. 数据源 模式 推断(csv / kusto/ ...)
  2. 常见 导入 场景(从文件/整个文件夹/ ...)
  3. 其他有用工具(kql生成器,...)

目录

概念

给定一个数据源,通常工作流程将包括

  1. 描述数据源。
  2. 准备目标数据存储(在我们的情况下,Kusto)
  3. 映射源到目标
  4. 加载数据
  5. 可选:自动化/转移到生产环境

安装

最低要求

  • Python 3.7+
  • 查看setup.py以获取依赖项

Pip

要使用Python包索引(PyPI)安装,请键入

pip install azure-kusto-ingestion-tools

这将公开新的命令行命令: kit --help

使用方法

基本

kit ingest -d /path/to/data/imdb -h mycluster.westus

以下命令将尝试使用类型推断导入 /path/to/data/imdb(非递归)中的所有文件。

<!>注意<!>:如果没有提供任何其他参数,此命令非常 有偏见,并将假定以下内容

选项

身份验证

每个需要通过 kusto 验证的命令都需要验证参数。

默认情况下,将尝试从Azure CLI 获取令牌。

其他选项包括:

应用

套件 [命令] -app {app_id}:{app_secret}

用户

套件 [命令] -user {user_id}:{password}

命名

  • 数据库 将设置为数据所在的目录,因此 /path/to/data/imdb 将寻找,并在需要时创建名为 imdb 的数据库。
    如果需要更多控制,请尝试 --database
  • 是实际文件名,因此 /path/to/data/imdb/aka_name.csv 将寻找,并在需要时创建名为 aka_name 的表。
    这可以通过确保数据被分割到文件夹中来实现,其中任何文件夹都是一个表。
    这种递归模式假定所有文件具有相同的表结构。

文件

数据库模式文件

这是一种描述数据库的简单方法。

这可以使用纯 JSON 格式来描述数据库模式,从而轻松复制整个数据库模式。

{
    "name": "imdb",
    "tables": [{
        "name": "aka_name",
        "columns": [{
            "dtype": "int",
            "name": "id",
            "index": 0
        }, {
            "dtype": "int",
            "name": "person_id",
            "index": 1
        }, {
            "dtype": "string",
            "name": "name",
            "index": 2
        }, {
            "dtype": "string",
            "name": "imdb_index",
            "index": 3
        }, {
            "dtype": "string",
            "name": "name_pcode_cf",
            "index": 4
        }, {
            "dtype": "string",
            "name": "name_pcode_nf",
            "index": 5
        }, {
            "dtype": "string",
            "name": "surname_pcode",
            "index": 6
        }, {
            "dtype": "string",
            "name": "md5sum",
            "index": 7
        }]
    },
    ...
    ]  
}

从现有集群

kit schema create -h 'https://mycluster.kusto.windows.net' -db imdb > imdb_schema.json

从 SQL 文件

kit schema create -sql imdb.sql > schema.json

从包含原始数据的文件夹

kit schema create -d path/to/dir > schema.json

更多内容将陆续推出...

清单文件

一个用于描述可以稍后运行的摄取详细信息的文件

{  
 "databases": [ "same as schema.json" ], 
 "mappings": [{ 
  "name": "aka_name_from_csv", 
  "columns": [{ 
    "source": { 
        "index": 0, 
      "data_type": "str" 
      }, 
      "target": { 
        "index": 0, 
        "data_type": "str" 
       }
    }] 
   }], 
   "operations": [{ 
      "database": "imdb", 
      "sources": [{ 
        "files": ["1.csv", "...", "99.csv"], 
      "mapping": "aka_name_from_csv" 
      }], 
      "target": [ "aka_name" ] 
   }]
 }  

示例

示例 1:摄取 IMDB 数据集,CSV 文件(用于连接顺序基准测试)

一个有用的场景是将整个现有数据集加载到 Kusto 中。
以论文 How good are query optimizers really? 中使用的 连接顺序基准测试 为例。

1. 将文件复制到本地目录

从 Azure 存储下载
wget https://imdb2013dataset.blob.core.windows.net/data/imdb.tgz --no-check-certificate

curl https://imdb2013dataset.blob.core.windows.net/data/imdb.tgz --output imdb.tgz

原始文件可用,但格式不正确(不符合https://tools.ietf.org/html/rfc4180)。
可以使用像xsv这样的工具来修复它们,
但我们将错误处理留到另一个部分。

2. 解压文件

tar -xvzf imdb.tgz

3. 下载 SQL 创建命令

wget https://raw.githubusercontent.com/gregrahn/join-order-benchmark/master/schema.sql -O imdb.sql --no-check-certificate

curl https://raw.githubusercontent.com/gregrahn/join-order-benchmark/master/schema.sql --output imdb.sql

4. 从 SQL 语句创建模式

kit schema create -sql schema.sql > imdb_schema.json

5. 在集群上应用模式

假设我们已经有了一个集群,并且我们已经使用 az cli 登录,我们只需将模式应用到我们选择的数据库上即可

kit schema apply -f imdb_schema.json -h mycluster.westus -db imdb

6. 从本地文件摄取数据

kit ingest -d . --pattern "*.csv" -h mycluster.westus -db imdb

7. 查询

使用 Azure 门户,您现在可以轻松登录并查询您的数据。

您始终可以通过比较源行数和目标列数来确保数据已加载

xsv count aka_name.csv - 应显示 901343 行

wc -l aka_name.csv - 应显示 901343 行

从 kusto 查询应显示相同

kit count --table aka_name -h mycluster.westus -db imdb - 应显示 901343

并查看数据: kit peek --table aka_name -n 10 -h mycluster.westus -db imdb

示例 2:摄取 Kaggle ML 数据集,CSV 和 JSON

Kaggale 有大量有趣的用于 ML/AI 的数据集。

让我们尝试摄取一些

https://www.kaggle.com/mlg-ulb/creditcardfraud/ https://www.kaggle.com/START-UMD/gtd/

上传到我们的Azure存储空间以方便访问

wget https://imdb2013dataset.blob.core.windows.net/data/creditcard.csv.gz --no-check-certificate  
wget https://imdb2013dataset.blob.core.windows.net/data/globalterrorism.csv.gz --no-check-certificate
wget https://imdb2013dataset.blob.core.windows.net/data/arxivData.csv.gz --no-check-certificate

curl https://imdb2013dataset.blob.core.windows.net/data/creditcard.csv.gz --output creditcard.csv.gz
curl https://imdb2013dataset.blob.core.windows.net/data/globalterrorism.csv.gz --output globalterrorism.csv.gz   
curl https://imdb2013dataset.blob.core.windows.net/data/arxivData.json.gz --output arxivData.json.gz

下载并解压后,思路相同,但这次文件包含标题,因此可以推断模式

kit ingest -d . -h mycluster.westus -db ml --headers

示例 3:复杂的嵌套JSON映射

让我们看看一个更高级的用例

wget https://imdb2013dataset.blob.core.windows.net/data/demo.json --no-check-certificate

curl https://imdb2013dataset.blob.core.windows.net/data/demo.json --output demo.json

假设我们的数据是json lines文件,其中每个条目看起来像

{"header":{"time":"24-Aug-18 09:42:15", "id":"0944f542-a637-411b-94dd-8874992d6ebc", "api_version":"v2"}, "payload":{"data":"NEEUGQSPIPKDPQPIVFE", "user":"owild@fabrikam.com"}}

看起来我们有一个嵌套对象。因为我们不确定会发生什么,所以让我们先进行dry run。让我们尝试使用--dry--object-depth 2来运行一个ingestion。

kit ingest -f demo.json --object-depth 2 -h mycluster.westus -db ml --dry > manifest.json

这会产生以下manifest.json,其中包含要执行的操作。

{
  "databases": [
    {
      "name": "ml",
      "tables": []
    }
  ],
  "mappings": [
    {
      "name": "demo_from_json",
      "columns": [
        {
          "source": {
            "dtype": "string",
            "name": "header.time",
            "index": null
          },
          "target": {
            "dtype": "string",
            "name": "header.time",
            "index": null
          }
        },
        {
          "source": {
            "dtype": "string",
            "name": "header.id",
            "index": null
          },
          "target": {
            "dtype": "string",
            "name": "header.id",
            "index": null
          }
        },
        {
          "source": {
            "dtype": "string",
            "name": "header.api_version",
            "index": null
          },
          "target": {
            "dtype": "string",
            "name": "header.api_version",
            "index": null
          }
        },
        {
          "source": {
            "dtype": "string",
            "name": "payload.data",
            "index": null
          },
          "target": {
            "dtype": "string",
            "name": "payload.data",
            "index": null
          }
        },
        {
          "source": {
            "dtype": "string",
            "name": "payload.user",
            "index": null
          },
          "target": {
            "dtype": "string",
            "name": "payload.user",
            "index": null
          }
        }
      ]
    }
  ],
  "operations": [
    {
      "database": "ml",
      "sources": [
        {
          "files": [
            "C:\\Users\\dadubovs\\temp\\ml_datasets\\demo.json"
          ],
          "mapping": "demo_from_json",
          "options": {},
          "data_format": "json"
        }
      ],
      "target": "demo"
    }
  ]
}

现在,假设我们不需要id字段,我们可以编辑映射并保存它。

如果我们仍然不确定,并希望更好地理解将要创建的命令,我们可以检查kql

kit kql -m manifest.json

它应该输出类似以下内容

// Table Creation Commands:
.create table demo (['header.time']:string,['header.api_version']:string,['payload.data']:string,['payload.user']:string)

// Ingestion Mapping Creation Commands:
.create table demo ingestion json mapping "demo_from_json" '[{"column":"header.time","path":"$.header.time","datatype":"string"},{"column":"header.api_version","path":"$.header.api_version","datatype":"string"},{"column":"payload.data","path":"$.payload.data","datatype":"string"},{"column":"payload.user","path":"$.payload.user","datatype":"string"}]'

一旦我们准备好了,我们可以根据manifest继续我们的ingestion

kit ingest -m manifest.json -h mycluster.westus

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

azure-kusto-ingestion-tools-0.3.1.tar.gz (24.9 kB 查看哈希值)

上传时间

由以下支持

AWSAWS 云计算和安全赞助商 DatadogDatadog 监控 FastlyFastly CDN GoogleGoogle 下载分析 MicrosoftMicrosoft PSF赞助商 PingdomPingdom 监控 SentrySentry 错误日志 StatusPageStatusPage 状态页面