跳转到主要内容

AWS VPC 流日志读取器

项目描述

简介

Build Status PyPI Version

亚马逊的VPC流日志类似于NetFlow和IPFIX日志,可用于安全和性能分析。Observable Networks 使用VPC流日志作为安全监控的端点建模的输入。

本项目包含

  • 用于在命令行上处理VPC流日志的实用程序
  • 用于检索和处理VPC流日志的Python库

该工具支持从CloudWatch LogsS3 读取流日志。对于S3目标,支持版本3的自定义日志格式。

该库基于boto3构建,应在Python 3的支持版本上工作。

有关VPC流日志及其启用方法的信息,请参阅AWS博客上的这篇文章。在从亚马逊Kinesis检索VPC流日志时,您可以使用此库与kinesis-logs-reader库一起使用。

安装

您可以通过使用 pip 来获取 flowlogs_reader

pip install flowlogs_reader

如果您想从源代码安装并/或贡献,您可以从 GitHub 克隆。

git clone https://github.com/obsrvbl-oss/flowlogs-reader.git
cd flowlogs-reader
python setup.py develop

CLI 使用方法

flowlogs-reader 提供了一个名为 flowlogs_reader 的命令行界面,允许您将 VPC 流日志记录打印到屏幕。它假定您的 AWS 凭据通过环境变量、boto 配置文件或 IAM 元数据提供。以下是一些示例用法。

位置类型

flowlogs_reader 有一个必需的参数,location。默认情况下,这被解释为 CloudWatch 日志组。

要使用 S3 位置,请指定 --location-type='s3'

  • flowlogs_reader --location-type="s3" "bucket-name/optional-prefix"

打印流

默认操作是 print 流。您还可以指定 ipsetfindipaggregate 操作。

  • flowlogs_reader location - 打印过去一小时的全部流量
  • flowlogs_reader location print 10 - 打印过去一小时的前 10 条流量
  • flowlogs_reader location ipset - 打印过去一小时中看到的唯一 IP 地址
  • flowlogs_reader location findip 198.51.100.2 - 打印涉及 198.51.100.2 的所有流量
  • flowlogs_reader location aggregate - 按照五元组聚合流量,然后以制表符分隔的流的形式打印(带标题)。这要求五元组中的每个字段都在数据格式中存在。

您可以组合 flowlogs_reader 的输出与其他命令行工具

  • flowlogs_reader location | grep REJECT - 打印所有 REJECT 流日志记录
  • flowlogs_reader location | awk '$6 = 443' - 打印来自端口 443 的所有流量

时间窗口

默认时间窗口是最后一个小时。您还可以指定 --start-time 和/或 --end-time。可以使用 -s-e 开关。

  • flowlogs_reader --start-time='2015-08-13 00:00:00' location
  • flowlogs_reader --end-time='2015-08-14 00:00:00' location
  • flowlogs_reader --start-time='2015-08-13 01:00:00' --end-time='2015-08-14 02:00:00' location

使用 --time-format 开关来控制如何解释开始和结束时间。默认值为 '%Y-%m-%d %H:%M:%S'。有关格式字符串的信息,请参阅 Python 文档中的 strptime

并发读取

使用 --thread-count 来同时从多个日志组或 S3 键读取

  • flowlogs_reader --thread_count=4 location

AWS 选项

其他命令行开关

  • flowlogs_reader --region='us-west-2' location - 连接到指定的 AWS 区域
  • flowlogs_reader --profile='dev_profile' location - 使用您的本地 AWS 配置文件中的配置文件来指定凭据和区域
  • flowlogs_reader --role-arn='arn:aws:iam::12345678901:role/myrole' --external-id='0a1b2c3d' location - 使用提供的角色和外部 ID 通过 sts assume-role 连接到第三方账户

对于 CloudWatch Logs 位置

  • flowlogs_reader --fields='${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}' - 使用给定的 fields 防止模块查询 EC2 的日志行格式
  • flowlogs_reader --filter-pattern='REJECT' location - 使用给定的 filter pattern 让服务器限制输出

对于 S3 位置

  • flowlogs_reader --location-type='s3' --include-accounts='12345678901,12345678902' bucket-name/optional-prefix - 只返回给定账户的日志
  • flowlogs_reader --location-type='s3' --include-regions='us-east-1,us-east-2' bucket-name/optional-prefix - 只返回给定区域的日志

模块使用方法

FlowRecord 从日志流中检索到的 event 字典中获取。它解析事件中的 message,记录如下所示

2 123456789010 eni-102010ab 198.51.100.1 192.0.2.1 443 49152 6 10 840 1439387263 1439387264 ACCEPT OK

并将其转换为如下所示的 Python 对象

>>> flow_record.srcaddr
'198.51.100.1'
>>> flow_record.dstaddr
'192.0.2.1'
>>> flow_record.srcport
443
>>> flow_record.to_dict()
{'account_id': '123456789010',
 'action': 'ACCEPT',
 'bytes': 840,
 'dstaddr': '192.0.2.1',
 'dstport': 49152,
 'end': datetime.datetime(2015, 8, 12, 13, 47, 44),
 'interface_id': 'eni-102010ab',
 'log_status': 'OK',
 'packets': 10,
 'protocol': 6,
 'srcaddr': '198.51.100.1',
 'srcport': 443,
 'start': datetime.datetime(2015, 8, 12, 13, 47, 43),
 'version': 2}

FlowLogsReader 从 CloudWatch 日志中读取。它接受日志组的名称,然后可以生成该组中的所有流日志记录。

>>> from flowlogs_reader import FlowLogsReader
... flow_log_reader = FlowLogsReader('flowlog_group')
... records = list(flow_log_reader)
... print(len(records))
176

S3FlowLogsReader 从 S3 读取。它接受 bucket 名称或 bucket/prefix 标识符。

默认情况下,这些类将生成过去一小时的记录。

您可以使用这些参数控制要检索的内容

  • start_timeend_time 是 Python datetime.datetime 对象
  • region_name 是类似于 'us-east-1' 的字符串。
  • boto_client 是 boto3 客户端对象。

当使用 FlowLogsReader 与 CloudWatch 日志一起时

  • fields 关键字是一个类似于 ('version', 'account-id') 的元组。如果未提供,则将查询 EC2 API 以确定日志格式。
  • filter_pattern 关键字是一个类似于 REJECT443 的字符串,用于过滤日志。请参阅下面的示例。

当使用 S3FlowLogsReader 与 S3 一起时

  • include_accounts 关键字是用于过滤日志的账户标识符的迭代器(作为字符串)。
  • include_regions 关键字是用于过滤日志的区域名称的迭代器。

示例

首先导入 FlowLogsReader

from flowlogs_reader import FlowLogsReader

查找 VPC 内部通信的所有 IP 地址

ip_set = set()
for record in FlowLogsReader('flowlog_group'):
    ip_set.add(record.srcaddr)
    ip_set.add(record.dstaddr)

查看一个 IP 地址的所有流量

target_ip = '192.0.2.1'
records = []
for record in FlowLogsReader('flowlog_group'):
    if (record.srcaddr == target_ip) or (record.dstaddr == target_ip):
        records.append(record)

遍历几个预配置的配置文件并收集所有 IP 地址

ip_set = set()
profile_names = ['profile1', 'profile2']
for profile_name in profile_names:
    for record in FlowLogsReader('flowlog_group', profile_name=profile_name):
        ip_set.add(record.srcaddr)
        ip_set.add(record.dstaddr)

应用对已正常记录的 UDP 流量的过滤器(仅限 CloudWatch 日志)

FILTER_PATTERN = (
    '[version="2", account_id, interface_id, srcaddr, dstaddr, '
    'srcport, dstport, protocol="17", packets, bytes, '
    'start, end, action, log_status="OK"]'
)

flow_log_reader = FlowLogsReader('flowlog_group', filter_pattern=FILTER_PATTERN)
records = list(flow_log_reader)
print(len(records))

从一系列区域检索日志

from flowlogs_reader import S3FlowLogsReader

reader = S3FlowLogsReader('example-bucket/optional-prefix', include_regions=['us-east-1', 'us-east-2'])
records = list(reader)
print(len(records))

您可以使用 aggregate_records 函数对记录进行聚合。传入一个 FlowLogsReaderS3FlowLogsReader 对象,可选地传入一个 key_fields 元组。将生成表示聚合流记录的 Python dict 对象。默认情况下将使用典型的 ('srcaddr', 'dstaddr', 'srcport', 'dstport', 'protocol')。将聚合 startendpacketsbytes 项目。

flow_log_reader = FlowLogsReader('flowlog_group')
key_fields = ('srcaddr', 'dstaddr')
records = list(aggregated_records(flow_log_reader, key_fields=key_fields))

迭代后处理的字节数可在 bytes_processed 属性中找到。对于 S3FlowLogsReader 实例,还有一个 compressed_bytes_processed 属性。

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

flowlogs_reader-5.0.1.tar.gz (27.5 kB 查看哈希值)

上传时间

构建分发

flowlogs_reader-5.0.1-py3-none-any.whl (18.9 kB 查看哈希值)

上传时间 Python 3

由以下支持