跳转到主要内容

一个现代且易于使用的可流式zip文件生成器

项目描述

zipstream-ng

Status Version Python

一个现代且易于使用的可流式zip文件生成器。它可以在不使用临时文件或过多内存的情况下实时打包和流式传输多个文件和文件夹到zip文件中。它还可以在流式传输之前计算zip文件的最终大小。

特性

  • 按需实时生成zip数据。
  • 可以在生成开始之前计算最终zip文件的总大小。
  • 内存使用低:由于zip是按需生成的,因此内存中需要保留的数据非常少(峰值使用量通常小于20MB,即使是TB级别的文件)。
  • 灵活的API:典型用例简单,复杂的用例也是可能的。
  • 支持从文件、字节、字符串和任何其他可迭代对象中压缩数据。
  • 跟踪添加到zip文件中最近修改的文件的日期。
  • 线程安全:如果多个线程同时向同一流添加数据,则不会损坏数据。
  • 包含Python的http.server模块的克隆,并添加了对zip的支持。尝试python -m zipstream.server
  • 自动使用Zip64扩展,但仅当需要时才使用。
  • 无外部依赖。

非常适合Web后端

  • 实时生成zip数据需要很少的内存、没有磁盘使用,并且比创建整个zip文件更快地开始生成数据。这意味着响应更快、没有临时文件,内存使用非常低。
  • 在生成任何数据之前(假设未使用压缩)就能计算出流的总大小,这意味着网络后端可以在其响应中提供Content-Length头信息。这允许客户端在传输流时显示进度条。
  • 通过跟踪最近添加到zip文件中的文件的日期,网络后端可以提供Last-Modified头信息。这允许客户端通过仅进行HEAD请求来检查他们是否拥有zip文件的最新版本,而不是必须下载整个文件。

安装

pip install zipstream-ng

示例

创建本地zip文件(简单示例)

在当前目录下创建一个名为files.zip的归档文件,其中包含/path/to/files下的所有文件。

from zipstream import ZipStream

zs = ZipStream.from_path("/path/to/files/")

with open("files.zip", "wb") as f:
    f.writelines(zs)

创建本地zip文件(展示更多API功能)

from zipstream import ZipStream, ZIP_DEFLATED

# Create a ZipStream that uses the maximum level of Deflate compression.
zs = ZipStream(compress_type=ZIP_DEFLATED, compress_level=9)

# Set the zip file's comment.
zs.comment = "Contains compressed important files"

# Add all the files under a path.
# Will add all files under a top-level folder called "files" in the zip.
zs.add_path("/path/to/files/")

# Add another file (will be added as "data.txt" in the zip file).
zs.add_path("/path/to/file.txt", "data.txt")

# Add some random data from an iterable.
# This generator will only be run when the stream is generated.
def random_data():
    import random
    for _ in range(10):
        yield random.randbytes(1024)

zs.add(random_data(), "random.bin")

# Add a file containing some static text.
# Will automatically be encoded to bytes before being added (uses utf-8).
zs.add("This is some text", "README.txt")

# Write out the zip file as it's being generated.
# At this point the data in the files will be read in and the generator
# will be iterated over.
with open("files.zip", "wb") as f:
    f.writelines(zs)

zipserver(包含在内)

一个功能齐全且实用的示例可以在包含的zipstream.server模块中找到。它是对Python内置的http.server的克隆,增加了将多个文件和文件夹作为单个zip文件提供的能力。通过安装包并运行zipserver --helppython -m zipstream.server --help来试用。

zipserver screenshot

与Flask Web应用集成

一个基于Flask的非常基础的文件服务器,将请求路径下的所有文件作为zip文件流式传输到客户端。它在Content-Length头信息中提供流的总大小,以便客户端在下载流时显示进度条。它还提供了一个Last-Modified头信息,以便客户端可以通过HEAD请求检查它是否已经拥有压缩数据的最新副本,而不是必须下载文件并检查。

请注意,尽管此示例可以工作,但由于缺乏输入验证和其他检查,将其直接部署不是一个好主意。

import os.path
from flask import Flask, Response
from zipstream import ZipStream

app = Flask(__name__)

@app.route("/", defaults={"path": "."})
@app.route("/<path:path>")
def stream_zip(path):
    name = os.path.basename(os.path.abspath(path))
    zs = ZipStream.from_path(path)
    return Response(
        zs,
        mimetype="application/zip",
        headers={
            "Content-Disposition": f"attachment; filename={name}.zip",
            "Content-Length": len(zs),
            "Last-Modified": zs.last_modified,
        }
    )

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

部分生成和最后时刻的文件添加

可以生成zip流,但在最终化之前停止。这允许在添加所有文件之后添加类似文件清单或压缩日志的内容。

ZipStream提供了一个info_list方法,它返回流中添加的所有文件的信息。在此示例中,所有这些信息将在最终化之前添加到名为"manifest.json"的文件中。

from zipstream import ZipStream
import json

def gen_zipfile()
    zs = ZipStream.from_path("/path/to/files")
    yield from zs.all_files()
    zs.add(
        json.dumps(
            zs.info_list(),
            indent=2
        ),
        "manifest.json"
    )
    yield from zs.finalize()

与stdlib的比较

从Python 3.6开始,实际上已经可以使用标准库来生成作为流的zip文件,但这并不方便或高效。考虑将文件目录压缩并作为网络连接流传输的典型用例

(注意,在此情况下,没有预先计算流的大小,因为这会使stdlib示例过于冗长)。

使用ZipStream

from zipstream import ZipStream

send_stream(
    ZipStream.from_path("/path/to/files/")
)
仅使用stdlib实现的相同(几乎)功能
import os
import io
from zipfile import ZipFile, ZipInfo

class Stream(io.RawIOBase):
    """An unseekable stream for the ZipFile to write to"""

    def __init__(self):
        self._buffer = bytearray()
        self._closed = False

    def close(self):
        self._closed = True

    def write(self, b):
        if self._closed:
            raise ValueError("Can't write to a closed stream")
        self._buffer += b
        return len(b)

    def readall(self):
        chunk = bytes(self._buffer)
        self._buffer.clear()
        return chunk

def iter_files(path):
    for dirpath, _, files in os.walk(path, followlinks=True):
        if not files:
            yield dirpath  # Preserve empty directories
        for f in files:
            yield os.path.join(dirpath, f)

def read_file(path):
    with open(path, "rb") as fp:
        while True:
            buf = fp.read(1024 * 64)
            if not buf:
                break
            yield buf

def generate_zipstream(path):
    stream = Stream()
    with ZipFile(stream, mode="w") as zf:
        toplevel = os.path.basename(os.path.normpath(path))
        for f in iter_files(path):
            # Use the basename of the path to set the arcname
            arcname = os.path.join(toplevel, os.path.relpath(f, path))
            zinfo = ZipInfo.from_file(f, arcname)

            # Write data to the zip file then yield the stream content
            with zf.open(zinfo, mode="w") as fp:
                if zinfo.is_dir():
                    continue
                for buf in read_file(f):
                    fp.write(buf)
                    yield stream.readall()
    yield stream.readall()

send_stream(
    generate_zipstream("/path/to/files/")
)

测试

此包包含广泛的测试。要运行它们,请安装pytestpip install pytest)并在项目目录中运行py.test

许可证

GNU LGPLv3下许可。

项目详情


下载文件

下载适合您平台文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分发

zipstream-ng-1.7.1.tar.gz (35.5 kB 查看哈希值)

上传时间 源代码

构建分发版

zipstream_ng-1.7.1-py3-none-any.whl (22.9 kB 查看哈希值)

上传时间 Python 3

由以下支持