跳转到主要内容

异步文件操作。

项目描述

Github Actions Latest Version https://img.shields.io/pypi/wheel/aiofile.svg https://img.shields.io/pypi/pyversions/aiofile.svg https://img.shields.io/pypi/l/aiofile.svg https://coveralls.io/repos/github/mosquito/aiofile/badge.svg?branch=master

使用asyncio支持的真正异步文件操作。

状态

开发 - 稳定

功能

  • 自2.0.0版本开始使用caio,其中包含linux libaio和两种基于线程的实现(基于C的纯Python)。

  • AIOFile没有内部指针。您应该在每次操作中传递offsetchunk_size,或者使用辅助工具(Reader或Writer)。最简单的方法是使用async_open来创建具有文件接口的对象。

  • 对于Linux,使用基于libaio的实现。

  • 对于POSIX(MacOS X和可选的Linux),使用基于threadpool的实现。

  • 否则使用纯Python的基于线程的实现。

  • 实现会根据系统兼容性自动选择。

限制

  • Linux 本地 AIO 实现无法打开特殊文件。对特殊文件系统(如 /proc//sys/)进行的异步操作不被内核支持。这不是 aiofile 或 caio 的问题。在这种情况下,你可能需要切换到基于线程的实现(参见 故障排除 部分)。然而,当在支持的文件系统上使用时,Linux 实现的开销更小,因此更受青睐,但它并不是万能的。

代码示例

所有代码示例都需要 Python 3.6+。

高级 API

async_open 辅助函数

辅助函数模拟 Python 文件类似对象,它返回具有类似但异步方法的对象。

支持的方法

  • async def read(length = -1) - 从文件中读取块,当 length 为 -1 时,将读取文件到末尾。

  • async def write(data) - 将块写入文件

  • def seek(offset) - 设置文件指针位置

  • def tell() - 返回当前文件指针位置

  • async def readline(size=-1, newline="\n") - 读取直到换行符或 EOF 的块。自 3.7.0 版本起,__aiter__ 返回 LineReader

    此方法对小行不太理想,因为它不会重用读取缓冲区。当你想按行读取文件时,请避免使用 async_open,而应使用 LineReader

  • def __aiter__() -> LineReader - 行的迭代器。

  • def iter_chunked(chunk_size: int = 32768) -> Reader - 块的迭代器。

  • `.file` 属性包含 AIOFile 对象

基本示例

import asyncio
from pathlib import Path
from tempfile import gettempdir

from aiofile import async_open

tmp_filename = Path(gettempdir()) / "hello.txt"

async def main():
    async with async_open(tmp_filename, 'w+') as afp:
        await afp.write("Hello ")
        await afp.write("world")
        afp.seek(0)

        print(await afp.read())

        await afp.write("Hello from\nasync world")
        print(await afp.readline())
        print(await afp.readline())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

无上下文管理器示例

import asyncio
import atexit
import os
from tempfile import mktemp

from aiofile import async_open


TMP_NAME = mktemp()
atexit.register(os.unlink, TMP_NAME)


async def main():
    afp = await async_open(TMP_NAME, "w")
    await afp.write("Hello")
    await afp.close()


asyncio.run(main())
assert open(TMP_NAME, "r").read() == "Hello"

连接示例程序(cat

import asyncio
import sys
from argparse import ArgumentParser
from pathlib import Path

from aiofile import async_open

parser = ArgumentParser(
    description="Read files line by line using asynchronous io API"
)
parser.add_argument("file_name", nargs="+", type=Path)

async def main(arguments):
    for src in arguments.file_name:
        async with async_open(src, "r") as afp:
            async for line in afp:
                sys.stdout.write(line)


asyncio.run(main(parser.parse_args()))

复制文件示例程序(cp

import asyncio
from argparse import ArgumentParser
from pathlib import Path

from aiofile import async_open

parser = ArgumentParser(
    description="Copying files using asynchronous io API"
)
parser.add_argument("source", type=Path)
parser.add_argument("dest", type=Path)
parser.add_argument("--chunk-size", type=int, default=65535)


async def main(arguments):
    async with async_open(arguments.source, "rb") as src, \
               async_open(arguments.dest, "wb") as dest:
        async for chunk in src.iter_chunked(arguments.chunk_size):
            await dest.write(chunk)


asyncio.run(main(parser.parse_args()))

打开已打开文件指针的示例

import asyncio
from typing import IO, Any
from aiofile import async_open


async def main(fp: IO[Any]):
    async with async_open(fp) as afp:
        await afp.write("Hello from\nasync world")
        print(await afp.readline())


with open("test.txt", "w+") as fp:
    asyncio.run(main(fp))

Linux 原生 aio 不支持读取和写入特殊文件(例如 procfs/sysfs/Unix 管道等),因此你可以使用兼容的上下文对象对这些文件执行操作。

import asyncio
from aiofile import async_open
from caio import thread_aio_asyncio
from contextlib import AsyncExitStack


async def main():
    async with AsyncExitStack() as stack:

        # Custom context should be reused
        ctx = await stack.enter_async_context(
            thread_aio_asyncio.AsyncioContext()
        )

        # Open special file with custom context
        src = await stack.enter_async_context(
            async_open("/proc/cpuinfo", "r", context=ctx)
        )

        # Open regular file with default context
        dest = await stack.enter_async_context(
            async_open("/tmp/cpuinfo", "w")
        )

        # Copying file content line by line
        async for line in src:
            await dest.write(line)


asyncio.run(main())

低级 API

AIOFile 类是异步文件操作的低级接口,其读取和写入方法接受在字节中进行的操作的 offset=0

这允许你在不移动虚拟光标的情况下对一个打开的文件执行许多独立的 IO 操作。

例如,你可以通过指定 Range 标头进行 10 个并发 HTTP 请求,并异步写入一个打开的文件,而偏移量必须手动计算,或者使用 10 个具有指定初始偏移量的 Writer 实例。

为了提供顺序读取和写入,有 WriterReaderLineReader。请注意,async_open 与 AIOFile 不同,它提供了类似文件操作的接口,它模拟了如内置 open 中实现的 read 或 write 等方法。

import asyncio
from aiofile import AIOFile


async def main():
    async with AIOFile("hello.txt", 'w+') as afp:
        payload = "Hello world\n"

        await asyncio.gather(
            *[afp.write(payload, offset=i * len(payload)) for i in range(10)]
        )

        await afp.fsync()

        assert await afp.read(len(payload) * 10) == payload * 10

asyncio.run(main())

实际上,低级 API 只是对 caio API 的轻微糖化。

import asyncio
from aiofile import AIOFile


async def main():
    async with AIOFile("/tmp/hello.txt", 'w+') as afp:
        await afp.write("Hello ")
        await afp.write("world", offset=7)
        await afp.fsync()

        print(await afp.read())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

ReaderWriter

当你想线性地读取或写入文件时,以下示例可能会有所帮助。

import asyncio
from aiofile import AIOFile, Reader, Writer


async def main():
    async with AIOFile("/tmp/hello.txt", 'w+') as afp:
        writer = Writer(afp)
        reader = Reader(afp, chunk_size=8)

        await writer("Hello")
        await writer(" ")
        await writer("World")
        await afp.fsync()

        async for chunk in reader:
            print(chunk)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

LineReader - 逐行读取文件

LineReader 是一个辅助函数,当你想要线性地逐行读取文件时非常有效。

它包含一个缓冲区,并将文件的片段分块读取到缓冲区中,在那里它将尝试找到行。

默认块大小为 4KB。

import asyncio
from aiofile import AIOFile, LineReader, Writer


async def main():
    async with AIOFile("/tmp/hello.txt", 'w+') as afp:
        writer = Writer(afp)

        await writer("Hello")
        await writer(" ")
        await writer("World")
        await writer("\n")
        await writer("\n")
        await writer("From async world")
        await afp.fsync()

        async for line in LineReader(afp):
            print(line)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

当你想按行读取文件时,请避免使用 async_open,而应使用 LineReader

更多示例

使用 aiofile 的有用示例

异步 CSV 字典读取器

import asyncio
import io
from csv import DictReader

from aiofile import AIOFile, LineReader


class AsyncDictReader:
    def __init__(self, afp, **kwargs):
        self.buffer = io.BytesIO()
        self.file_reader = LineReader(
            afp, line_sep=kwargs.pop('line_sep', '\n'),
            chunk_size=kwargs.pop('chunk_size', 4096),
            offset=kwargs.pop('offset', 0),
        )
        self.reader = DictReader(
            io.TextIOWrapper(
                self.buffer,
                encoding=kwargs.pop('encoding', 'utf-8'),
                errors=kwargs.pop('errors', 'replace'),
            ), **kwargs,
        )
        self.line_num = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.line_num == 0:
            header = await self.file_reader.readline()
            self.buffer.write(header)

        line = await self.file_reader.readline()

        if not line:
            raise StopAsyncIteration

        self.buffer.write(line)
        self.buffer.seek(0)

        try:
            result = next(self.reader)
        except StopIteration as e:
            raise StopAsyncIteration from e

        self.buffer.seek(0)
        self.buffer.truncate(0)
        self.line_num = self.reader.line_num

        return result


async def main():
    async with AIOFile('sample.csv', 'rb') as afp:
        async for item in AsyncDictReader(afp):
            print(item)


asyncio.run(main())

故障排除

caio 的 Linux 实现在现代 Linux 内核版本和文件系统中运行正常。因此,您可能会遇到特定于您环境的兼容性问题。这不是一个错误,并且可能可以通过一些方法解决。

  1. 升级内核

  2. 使用兼容的文件系统

  3. 使用基于线程或纯 Python 实现的版本。

自 0.7.0 版本以来,caio 包含了一些实现此功能的途径。

1. 在运行时,使用环境变量 CAIO_IMPL 并指定可能的值

  • linux - 使用原生 Linux 内核 aio 机制

  • thread - 使用基于线程的 C 语言实现

  • python - 使用纯 Python 实现的版本

2. 文件 default_implementation 位于 caio 安装路径中靠近 __init__.py 的位置。这对于发行版包维护者很有用。此文件可能包含注释(以 # 符号开始的行)并且第一行应该是 linuxthreadpython 中的一个。

  1. 您可以手动管理上下文

import asyncio

from aiofile import async_open
from caio import linux_aio_asyncio, thread_aio_asyncio


async def main():
    linux_ctx = linux_aio_asyncio.AsyncioContext()
    threads_ctx = thread_aio_asyncio.AsyncioContext()

    async with async_open("/tmp/test.txt", "w", context=linux_ctx) as afp:
        await afp.write("Hello")

    async with async_open("/tmp/test.txt", "r", context=threads_ctx) as afp:
        print(await afp.read())


asyncio.run(main())

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪一个,请了解更多关于 安装包 的信息。

源分发

aiofile-3.8.8.tar.gz (19.5 kB 查看哈希值)

上传时间 源代码

构建分发

aiofile-3.8.8-py3-none-any.whl (19.9 kB 查看哈希值)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面