Source code for PIL.PngImagePlugin

#
# The Python Imaging Library.
# $Id$
#
# PNG support code
#
# See "PNG (Portable Network Graphics) Specification, version 1.0;
# W3C Recommendation", 1996-10-01, Thomas Boutell (ed.).
#
# history:
# 1996-05-06 fl   Created (couldn't resist it)
# 1996-12-14 fl   Upgraded, added read and verify support (0.2)
# 1996-12-15 fl   Separate PNG stream parser
# 1996-12-29 fl   Added write support, added getchunks
# 1996-12-30 fl   Eliminated circular references in decoder (0.3)
# 1998-07-12 fl   Read/write 16-bit images as mode I (0.4)
# 2001-02-08 fl   Added transparency support (from Zircon) (0.5)
# 2001-04-16 fl   Don't close data source in "open" method (0.6)
# 2004-02-24 fl   Don't even pretend to support interlaced files (0.7)
# 2004-08-31 fl   Do basic sanity check on chunk identifiers (0.8)
# 2004-09-20 fl   Added PngInfo chunk container
# 2004-12-18 fl   Added DPI read support (based on code by Niki Spahiev)
# 2008-08-13 fl   Added tRNS support for RGB images
# 2009-03-06 fl   Support for preserving ICC profiles (by Florian Hoech)
# 2009-03-08 fl   Added zTXT support (from Lowell Alleman)
# 2009-03-29 fl   Read interlaced PNG files (from Conrado Porto Lopes Gouvua)
#
# Copyright (c) 1997-2009 by Secret Labs AB
# Copyright (c) 1996 by Fredrik Lundh
#
# See the README file for information on usage and redistribution.
#
from __future__ import annotations

import itertools
import logging
import re
import struct
import warnings
import zlib
from collections.abc import Callable
from enum import IntEnum
from typing import IO, Any, NamedTuple, NoReturn, cast

from . import Image, ImageChops, ImageFile, ImagePalette, ImageSequence
from ._binary import i16be as i16
from ._binary import i32be as i32
from ._binary import o8
from ._binary import o16be as o16
from ._binary import o32be as o32
from ._util import DeferredError

TYPE_CHECKING = False
if TYPE_CHECKING:
    from . import _imaging

logger = logging.getLogger(__name__)

is_cid = re.compile(rb"\w\w\w\w").match


_MAGIC = b"\211PNG\r\n\032\n"


_MODES = {
    # supported bits/color combinations, and corresponding modes/rawmodes
    # Grayscale
    (1, 0): ("1", "1"),
    (2, 0): ("L", "L;2"),
    (4, 0): ("L", "L;4"),
    (8, 0): ("L", "L"),
    (16, 0): ("I;16", "I;16B"),
    # Truecolour
    (8, 2): ("RGB", "RGB"),
    (16, 2): ("RGB", "RGB;16B"),
    # Indexed-colour
    (1, 3): ("P", "P;1"),
    (2, 3): ("P", "P;2"),
    (4, 3): ("P", "P;4"),
    (8, 3): ("P", "P"),
    # Grayscale with alpha
    (8, 4): ("LA", "LA"),
    (16, 4): ("RGBA", "LA;16B"),  # LA;16B->LA not yet available
    # Truecolour with alpha
    (8, 6): ("RGBA", "RGBA"),
    (16, 6): ("RGBA", "RGBA;16B"),
}


_simple_palette = re.compile(b"^\xff*\x00\xff*$")

MAX_TEXT_CHUNK = ImageFile.SAFEBLOCK
"""
Maximum decompressed size for a iTXt or zTXt chunk.
Eliminates decompression bombs where compressed chunks can expand 1000x.
See :ref:`Text in PNG File Format<png-text>`.
"""
MAX_TEXT_MEMORY = 64 * MAX_TEXT_CHUNK
"""
Set the maximum total text chunk size.
See :ref:`Text in PNG File Format<png-text>`.
"""


# APNG frame disposal modes
[docs] class Disposal(IntEnum): OP_NONE = 0 """ No disposal is done on this frame before rendering the next frame. See :ref:`Saving APNG sequences<apng-saving>`. """ OP_BACKGROUND = 1 """ This frame’s modified region is cleared to fully transparent black before rendering the next frame. See :ref:`Saving APNG sequences<apng-saving>`. """ OP_PREVIOUS = 2 """ This frame’s modified region is reverted to the previous frame’s contents before rendering the next frame. See :ref:`Saving APNG sequences<apng-saving>`. """
# APNG frame blend modes
[docs] class Blend(IntEnum): OP_SOURCE = 0 """ All color components of this frame, including alpha, overwrite the previous output image contents. See :ref:`Saving APNG sequences<apng-saving>`. """ OP_OVER = 1 """ This frame should be alpha composited with the previous output image contents. See :ref:`Saving APNG sequences<apng-saving>`. """
def _safe_zlib_decompress(s: bytes) -> bytes: dobj = zlib.decompressobj() plaintext = dobj.decompress(s, MAX_TEXT_CHUNK) if dobj.unconsumed_tail: msg = "Decompressed data too large for PngImagePlugin.MAX_TEXT_CHUNK" raise ValueError(msg) return plaintext def _crc32(data: bytes, seed: int = 0) -> int: return zlib.crc32(data, seed) & 0xFFFFFFFF # -------------------------------------------------------------------- # Support classes. Suitable for PNG and related formats like MNG etc.
[docs] class ChunkStream: def __init__(self, fp: IO[bytes]) -> None: self.fp: IO[bytes] | None = fp self.queue: list[tuple[bytes, int, int]] | None = []
[docs] def read(self) -> tuple[bytes, int, int]: """Fetch a new chunk. Returns header information.""" cid = None assert self.fp is not None if self.queue: cid, pos, length = self.queue.pop() self.fp.seek(pos) else: s = self.fp.read(8) cid = s[4:] pos = self.fp.tell() length = i32(s) if not is_cid(cid): if not ImageFile.LOAD_TRUNCATED_IMAGES: msg = f"broken PNG file (chunk {repr(cid)})" raise SyntaxError(msg) return cid, pos, length
def __enter__(self) -> ChunkStream: return self def __exit__(self, *args: object) -> None: self.close()
[docs] def close(self) -> None: self.queue = self.fp = None
[docs] def push(self, cid: bytes, pos: int, length: int) -> None: assert self.queue is not None self.queue.append((cid, pos, length))
[docs] def call(self, cid: bytes, pos: int, length: int) -> bytes: """Call the appropriate chunk handler""" logger.debug("STREAM %r %s %s", cid, pos, length) return getattr(self, f"chunk_{cid.decode('ascii')}")(pos, length)
[docs] def crc(self, cid: bytes, data: bytes) -> None: """Read and verify checksum""" # Skip CRC checks for ancillary chunks if allowed to load truncated # images # 5th byte of first char is 1 [specs, section 5.4] if ImageFile.LOAD_TRUNCATED_IMAGES and (cid[0] >> 5 & 1): self.crc_skip(cid, data) return assert self.fp is not None try: crc1 = _crc32(data, _crc32(cid)) crc2 = i32(self.fp.read(4)) if crc1 != crc2: msg = f"broken PNG file (bad header checksum in {repr(cid)})" raise SyntaxError(msg) except struct.error as e: msg = f"broken PNG file (incomplete checksum in {repr(cid)})" raise SyntaxError(msg) from e
[docs] def crc_skip(self, cid: bytes, data: bytes) -> None: """Read checksum""" assert self.fp is not None self.fp.read(4)
[docs] def verify(self, endchunk: bytes = b"IEND") -> list[bytes]: # Simple approach; just calculate checksum for all remaining # blocks. Must be called directly after open. cids = [] assert self.fp is not None while True: try: cid, pos, length = self.read() except struct.error as e: msg = "truncated PNG file" raise OSError(msg) from e if cid == endchunk: break self.crc(cid, ImageFile._safe_read(self.fp, length)) cids.append(cid) return cids
[docs] class iTXt(str): """ Subclass of string to allow iTXt chunks to look like strings while keeping their extra information """ lang: str | bytes | None tkey: str | bytes | None
[docs] @staticmethod def __new__( cls, text: str, lang: str | None = None, tkey: str | None = None ) -> iTXt: """ :param cls: the class to use when creating the instance :param text: value for this key :param lang: language code :param tkey: UTF-8 version of the key name """ self = str.__new__(cls, text) self.lang = lang self.tkey = tkey return self
[docs] class PngInfo: """ PNG chunk container (for use with save(pnginfo=)) """ def __init__(self) -> None: self.chunks: list[tuple[bytes, bytes, bool]] = []
[docs] def add(self, cid: bytes, data: bytes, after_idat: bool = False) -> None: """Appends an arbitrary chunk. Use with caution. :param cid: a byte string, 4 bytes long. :param data: a byte string of the encoded data :param after_idat: for use with private chunks. Whether the chunk should be written after IDAT """ self.chunks.append((cid, data, after_idat))
[docs] def add_itxt( self, key: str | bytes, value: str | bytes, lang: str | bytes = "", tkey: str | bytes = "", zip: bool = False, ) -> None: """Appends an iTXt chunk. :param key: latin-1 encodable text key name :param value: value for this key :param lang: language code :param tkey: UTF-8 version of the key name :param zip: compression flag """ if not isinstance(key, bytes): key = key.encode("latin-1", "strict") if not isinstance(value, bytes): value = value.encode("utf-8", "strict") if not isinstance(lang, bytes): lang = lang.encode("utf-8", "strict") if not isinstance(tkey, bytes): tkey = tkey.encode("utf-8", "strict") if zip: self.add( b"iTXt", key + b"\0\x01\0" + lang + b"\0" + tkey + b"\0" + zlib.compress(value), ) else: self.add(b"iTXt", key + b"\0\0\0" + lang + b"\0" + tkey + b"\0" + value)
[docs] def add_text( self, key: str | bytes, value: str | bytes | iTXt, zip: bool = False ) -> None: """Appends a text chunk. :param key: latin-1 encodable text key name :param value: value for this key, text or an :py:class:`PIL.PngImagePlugin.iTXt` instance :param zip: compression flag """ if isinstance(value, iTXt): return self.add_itxt( key, value, value.lang if value.lang is not None else b"", value.tkey if value.tkey is not None else b"", zip=zip, ) # The tEXt chunk stores latin-1 text if not isinstance(value, bytes): try: value = value.encode("latin-1", "strict") except UnicodeError: return self.add_itxt(key, value, zip=zip) if not isinstance(key, bytes): key = key.encode("latin-1", "strict") if zip: self.add(b"zTXt", key + b"\0\0" + zlib.compress(value)) else: self.add(b"tEXt", key + b"\0" + value)
# -------------------------------------------------------------------- # PNG image stream (IHDR/IEND) class _RewindState(NamedTuple): info: dict[str | tuple[int, int], Any] tile: list[ImageFile._Tile] seq_num: int | None
[docs] class PngStream(ChunkStream): def __init__(self, fp: IO[bytes]) -> None: super().__init__(fp) # local copies of Image attributes self.im_info: dict[str | tuple[int, int], Any] = {} self.im_text: dict[str, str | iTXt] = {} self.im_size = (0, 0) self.im_mode = "" self.im_tile: list[ImageFile._Tile] = [] self.im_palette: tuple[str, bytes] | None = None self.im_custom_mimetype: str | None = None self.im_n_frames: int | None = None self._seq_num: int | None = None self.rewind_state = _RewindState({}, [], None) self.text_memory = 0
[docs] def check_text_memory(self, chunklen: int) -> None: self.text_memory += chunklen if self.text_memory > MAX_TEXT_MEMORY: msg = ( "Too much memory used in text chunks: " f"{self.text_memory}>MAX_TEXT_MEMORY" ) raise ValueError(msg)
[docs] def save_rewind(self) -> None: self.rewind_state = _RewindState( self.im_info.copy(), self.im_tile, self._seq_num, )
[docs] def rewind(self) -> None: self.im_info = self.rewind_state.info.copy() self.im_tile = self.rewind_state.tile self._seq_num = self.rewind_state.seq_num
[docs] def chunk_iCCP(self, pos: int, length: int) -> bytes: # ICC profile assert self.fp is not None s = ImageFile._safe_read(self.fp, length) # according to PNG spec, the iCCP chunk contains: # Profile name 1-79 bytes (character string) # Null separator 1 byte (null character) # Compression method 1 byte (0) # Compressed profile n bytes (zlib with deflate compression) i = s.find(b"\0") logger.debug("iCCP profile name %r", s[:i]) comp_method = s[i + 1] logger.debug("Compression method %s", comp_method) if comp_method != 0: msg = f"Unknown compression method {comp_method} in iCCP chunk" raise SyntaxError(msg) try: icc_profile = _safe_zlib_decompress(s[i + 2 :]) except ValueError: if ImageFile.LOAD_TRUNCATED_IMAGES: icc_profile = None else: raise except zlib.error: icc_profile = None # FIXME self.im_info["icc_profile"] = icc_profile return s
[docs] def chunk_IHDR(self, pos: int, length: int) -> bytes: # image header assert self.fp is not None s = ImageFile._safe_read(self.fp, length) if length < 13: if ImageFile.LOAD_TRUNCATED_IMAGES: return s msg = "Truncated IHDR chunk" raise ValueError(msg) self.im_size = i32(s, 0), i32(s, 4) try: self.im_mode, self.im_rawmode = _MODES[(s[8], s[9])] except Exception: pass if s[12]: self.im_info["interlace"] = 1 if s[11]: msg = "unknown filter category" raise SyntaxError(msg) return s
[docs] def chunk_IDAT(self, pos: int, length: int) -> NoReturn: # image data if "bbox" in self.im_info: tile = [ImageFile._Tile("zip", self.im_info["bbox"], pos, self.im_rawmode)] else: if self.im_n_frames is not None: self.im_info["default_image"] = True tile = [ImageFile._Tile("zip", (0, 0) + self.im_size, pos, self.im_rawmode)] self.im_tile = tile self.im_idat = length msg = "image data found" raise EOFError(msg)
[docs] def chunk_IEND(self, pos: int, length: int) -> NoReturn: msg = "end of PNG image" raise EOFError(msg)
[docs] def chunk_PLTE(self, pos: int, length: int) -> bytes: # palette assert self.fp is not None s = ImageFile._safe_read(self.fp, length) if self.im_mode == "P": self.im_palette = "RGB", s return s
[docs] def chunk_tRNS(self, pos: int, length: int) -> bytes: # transparency assert self.fp is not None s = ImageFile._safe_read(self.fp, length) if self.im_mode == "P": if _simple_palette.match(s): # tRNS contains only one full-transparent entry, # other entries are full opaque i = s.find(b"\0") if i >= 0: self.im_info["transparency"] = i else: # otherwise, we have a byte string with one alpha value # for each palette entry self.im_info["transparency"] = s elif self.im_mode in ("1", "L", "I;16"): self.im_info["transparency"] = i16(s) elif self.im_mode == "RGB": self.im_info["transparency"] = i16(s), i16(s, 2), i16(s, 4) return s
[docs] def chunk_gAMA(self, pos: int, length: int) -> bytes: # gamma setting assert self.fp is not None s = ImageFile._safe_read(self.fp, length) self.im_info["gamma"] = i32(s) / 100000.0 return s
[docs] def chunk_cHRM(self, pos: int, length: int) -> bytes: # chromaticity, 8 unsigned ints, actual value is scaled by 100,000 # WP x,y, Red x,y, Green x,y Blue x,y assert self.fp is not None s = ImageFile._safe_read(self.fp, length) raw_vals = struct.unpack(f">{len(s) // 4}I", s) self.im_info["chromaticity"] = tuple(elt / 100000.0 for elt in raw_vals) return s
[docs] def chunk_sRGB(self, pos: int, length: int) -> bytes: # srgb rendering intent, 1 byte # 0 perceptual # 1 relative colorimetric # 2 saturation # 3 absolute colorimetric assert self.fp is not None s = ImageFile._safe_read(self.fp, length) if length < 1: if ImageFile.LOAD_TRUNCATED_IMAGES: return s msg = "Truncated sRGB chunk" raise ValueError(msg) self.im_info["srgb"] = s[0] return s
[docs] def chunk_pHYs(self, pos: int, length: int) -> bytes: # pixels per unit assert self.fp is not None s = ImageFile._safe_read(self.fp, length) if length < 9: if ImageFile.LOAD_TRUNCATED_IMAGES: return s msg = "Truncated pHYs chunk" raise ValueError(msg) px, py = i32(s, 0), i32(s, 4) unit = s[8] if unit == 1: # meter dpi = px * 0.0254, py * 0.0254 self.im_info["dpi"] = dpi elif unit == 0: self.im_info["aspect"] = px, py return s
[docs] def chunk_tEXt(self, pos: int, length: int) -> bytes: # text assert self.fp is not None s = ImageFile._safe_read(self.fp, length) try: k, v = s.split(b"\0", 1) except ValueError: # fallback for broken tEXt tags k = s v = b"" if k: k_str = k.decode("latin-1", "strict") v_str = v.decode("latin-1", "replace") self.im_info[k_str] = v if k == b"exif" else v_str self.im_text[k_str] = v_str self.check_text_memory(len(v_str)) return s
[docs] def chunk_zTXt(self, pos: int, length: int) -> bytes: # compressed text assert self.fp is not None s = ImageFile._safe_read(self.fp, length) try: k, v = s.split(b"\0", 1) except ValueError: k = s v = b"" if v: comp_method = v[0] else: comp_method = 0 if comp_method != 0: msg = f"Unknown compression method {comp_method} in zTXt chunk" raise SyntaxError(msg) try: v = _safe_zlib_decompress(v[1:]) except ValueError: if ImageFile.LOAD_TRUNCATED_IMAGES: v = b"" else: raise except zlib.error: v = b"" if k: k_str = k.decode("latin-1", "strict") v_str = v.decode("latin-1", "replace") self.im_info[k_str] = self.im_text[k_str] = v_str self.check_text_memory(len(v_str)) return s
[docs] def chunk_iTXt(self, pos: int, length: int) -> bytes: # international text assert self.fp is not None r = s = ImageFile._safe_read(self.fp, length) try: k, r = r.split(b"\0", 1) except ValueError: return s if len(r) < 2: return s cf, cm, r = r[0], r[1], r[2:] try: lang, tk, v = r.split(b"\0", 2) except ValueError: return s if cf != 0: if cm == 0: try: v = _safe_zlib_decompress(v) except ValueError: if ImageFile.LOAD_TRUNCATED_IMAGES: return s else: raise except zlib.error: return s else: return s if k == b"XML:com.adobe.xmp": self.im_info["xmp"] = v try: k_str = k.decode("latin-1", "strict") lang_str = lang.decode("utf-8", "strict") tk_str = tk.decode("utf-8", "strict") v_str = v.decode("utf-8", "strict") except UnicodeError: return s self.im_info[k_str] = self.im_text[k_str] = iTXt(v_str, lang_str, tk_str) self.check_text_memory(len(v_str)) return s
[docs] def chunk_eXIf(self, pos: int, length: int) -> bytes: assert self.fp is not None s = ImageFile._safe_read(self.fp, length) self.im_info["exif"] = b"Exif\x00\x00" + s return s
# APNG chunks
[docs] def chunk_acTL(self, pos: int, length: int) -> bytes: assert self.fp is not None s = ImageFile._safe_read(self.fp, length) if length < 8: if ImageFile.LOAD_TRUNCATED_IMAGES: return s msg = "APNG contains truncated acTL chunk" raise ValueError(msg) if self.im_n_frames is not None: self.im_n_frames = None warnings.warn("Invalid APNG, will use default PNG image if possible") return s n_frames = i32(s) if n_frames == 0 or n_frames > 0x80000000: warnings.warn("Invalid APNG, will use default PNG image if possible") return s self.im_n_frames = n_frames self.im_info["loop"] = i32(s, 4) self.im_custom_mimetype = "image/apng" return s
[docs] def chunk_fcTL(self, pos: int, length: int) -> bytes: assert self.fp is not None s = ImageFile._safe_read(self.fp, length) if length < 26: if ImageFile.LOAD_TRUNCATED_IMAGES: return s msg = "APNG contains truncated fcTL chunk" raise ValueError(msg) seq = i32(s) if (self._seq_num is None and seq != 0) or ( self._seq_num is not None and self._seq_num != seq - 1 ): msg = "APNG contains frame sequence errors" raise SyntaxError(msg) self._seq_num = seq width, height = i32(s, 4), i32(s, 8) px, py = i32(s, 12), i32(s, 16) im_w, im_h = self.im_size if px + width > im_w or py + height > im_h: msg = "APNG contains invalid frames" raise SyntaxError(msg) self.im_info["bbox"] = (px, py, px + width, py + height) delay_num, delay_den = i16(s, 20), i16(s, 22) if delay_den == 0: delay_den = 100 self.im_info["duration"] = float(delay_num) / float(delay_den) * 1000 self.im_info["disposal"] = s[24] self.im_info["blend"] = s[25] return s
[docs] def chunk_fdAT(self, pos: int, length: int) -> bytes: assert self.fp is not None if length < 4: if ImageFile.LOAD_TRUNCATED_IMAGES: s = ImageFile._safe_read(self.fp, length) return s msg = "APNG contains truncated fDAT chunk" raise ValueError(msg) s = ImageFile._safe_read(self.fp, 4) seq = i32(s) if self._seq_num != seq - 1: msg = "APNG contains frame sequence errors" raise SyntaxError(msg) self._seq_num = seq return self.chunk_IDAT(pos + 4, length - 4)
# -------------------------------------------------------------------- # PNG reader def _accept(prefix: bytes) -> bool: return prefix.startswith(_MAGIC) ## # Image plugin for PNG images.
[docs] class PngImageFile(ImageFile.ImageFile): format = "PNG" format_description = "Portable network graphics" def _open(self) -> None: if not _accept(self.fp.read(8)): msg = "not a PNG file" raise SyntaxError(msg) self._fp = self.fp self.__frame = 0 # # Parse headers up to the first IDAT or fDAT chunk self.private_chunks: list[tuple[bytes, bytes] | tuple[bytes, bytes, bool]] = [] self.png: PngStream | None = PngStream(self.fp) while True: # # get next chunk cid, pos, length = self.png.read() try: s = self.png.call(cid, pos, length) except EOFError: break except AttributeError: logger.debug("%r %s %s (unknown)", cid, pos, length) s = ImageFile._safe_read(self.fp, length) if cid[1:2].islower(): self.private_chunks.append((cid, s)) self.png.crc(cid, s) # # Copy relevant attributes from the PngStream. An alternative # would be to let the PngStream class modify these attributes # directly, but that introduces circular references which are # difficult to break if things go wrong in the decoder... # (believe me, I've tried ;-) self._mode = self.png.im_mode self._size = self.png.im_size self.info = self.png.im_info self._text: dict[str, str | iTXt] | None = None self.tile = self.png.im_tile self.custom_mimetype = self.png.im_custom_mimetype self.n_frames = self.png.im_n_frames or 1 self.default_image = self.info.get("default_image", False) if self.png.im_palette: rawmode, data = self.png.im_palette self.palette = ImagePalette.raw(rawmode, data) if cid == b"fdAT": self.__prepare_idat = length - 4 else: self.__prepare_idat = length # used by load_prepare() if self.png.im_n_frames is not None: self._close_exclusive_fp_after_loading = False self.png.save_rewind() self.__rewind_idat = self.__prepare_idat self.__rewind = self._fp.tell() if self.default_image: # IDAT chunk contains default image and not first animation frame self.n_frames += 1 self._seek(0) self.is_animated = self.n_frames > 1 @property def text(self) -> dict[str, str | iTXt]: # experimental if self._text is None: # iTxt, tEXt and zTXt chunks may appear at the end of the file # So load the file to ensure that they are read if self.is_animated: frame = self.__frame # for APNG, seek to the final frame before loading self.seek(self.n_frames - 1) self.load() if self.is_animated: self.seek(frame) assert self._text is not None return self._text
[docs] def verify(self) -> None: """Verify PNG file""" if self.fp is None: msg = "verify must be called directly after open" raise RuntimeError(msg) # back up to beginning of IDAT block self.fp.seek(self.tile[0][2] - 8) assert self.png is not None self.png.verify() self.png.close() if self._exclusive_fp: self.fp.close() self.fp = None
[docs] def seek(self, frame: int) -> None: if not self._seek_check(frame): return if frame < self.__frame: self._seek(0, True) last_frame = self.__frame for f in range(self.__frame + 1, frame + 1): try: self._seek(f) except EOFError as e: self.seek(last_frame) msg = "no more images in APNG file" raise EOFError(msg) from e
def _seek(self, frame: int, rewind: bool = False) -> None: assert self.png is not None if isinstance(self._fp, DeferredError): raise self._fp.ex self.dispose: _imaging.ImagingCore | None dispose_extent = None if frame == 0: if rewind: self._fp.seek(self.__rewind) self.png.rewind() self.__prepare_idat = self.__rewind_idat self._im = None self.info = self.png.im_info self.tile = self.png.im_tile self.fp = self._fp self._prev_im = None self.dispose = None self.default_image = self.info.get("default_image", False) self.dispose_op = self.info.get("disposal") self.blend_op = self.info.get("blend") dispose_extent = self.info.get("bbox") self.__frame = 0 else: if frame != self.__frame + 1: msg = f"cannot seek to frame {frame}" raise ValueError(msg) # ensure previous frame was loaded self.load() if self.dispose: self.im.paste(self.dispose, self.dispose_extent) self._prev_im = self.im.copy() self.fp = self._fp # advance to the next frame if self.__prepare_idat: ImageFile._safe_read(self.fp, self.__prepare_idat) self.__prepare_idat = 0 frame_start = False while True: self.fp.read(4) # CRC try: cid, pos, length = self.png.read() except (struct.error, SyntaxError): break if cid == b"IEND": msg = "No more images in APNG file" raise EOFError(msg) if cid == b"fcTL": if frame_start: # there must be at least one fdAT chunk between fcTL chunks msg = "APNG missing frame data" raise SyntaxError(msg) frame_start = True try: self.png.call(cid, pos, length) except UnicodeDecodeError: break except EOFError: if cid == b"fdAT": length -= 4 if frame_start: self.__prepare_idat = length break ImageFile._safe_read(self.fp, length) except AttributeError: logger.debug("%r %s %s (unknown)", cid, pos, length) ImageFile._safe_read(self.fp, length) self.__frame = frame self.tile = self.png.im_tile self.dispose_op = self.info.get("disposal") self.blend_op = self.info.get("blend") dispose_extent = self.info.get("bbox") if not self.tile: msg = "image not found in APNG frame" raise EOFError(msg) if dispose_extent: self.dispose_extent: tuple[float, float, float, float] = dispose_extent # setup frame disposal (actual disposal done when needed in the next _seek()) if self._prev_im is None and self.dispose_op == Disposal.OP_PREVIOUS: self.dispose_op = Disposal.OP_BACKGROUND self.dispose = None if self.dispose_op == Disposal.OP_PREVIOUS: if self._prev_im: self.dispose = self._prev_im.copy() self.dispose = self._crop(self.dispose, self.dispose_extent) elif self.dispose_op == Disposal.OP_BACKGROUND: self.dispose = Image.core.fill(self.mode, self.size) self.dispose = self._crop(self.dispose, self.dispose_extent)
[docs] def tell(self) -> int: return self.__frame
[docs] def load_prepare(self) -> None: """internal: prepare to read PNG file""" if self.info.get("interlace"): self.decoderconfig = self.decoderconfig + (1,) self.__idat = self.__prepare_idat # used by load_read() ImageFile.ImageFile.load_prepare(self)
[docs] def load_read(self, read_bytes: int) -> bytes: """internal: read more image data""" assert self.png is not None while self.__idat == 0: # end of chunk, skip forward to next one self.fp.read(4) # CRC cid, pos, length = self.png.read() if cid not in [b"IDAT", b"DDAT", b"fdAT"]: self.png.push(cid, pos, length) return b"" if cid == b"fdAT": try: self.png.call(cid, pos, length) except EOFError: pass self.__idat = length - 4 # sequence_num has already been read else: self.__idat = length # empty chunks are allowed # read more data from this chunk if read_bytes <= 0: read_bytes = self.__idat else: read_bytes = min(read_bytes, self.__idat) self.__idat = self.__idat - read_bytes return self.fp.read(read_bytes)
[docs] def load_end(self) -> None: """internal: finished reading image data""" assert self.png is not None if self.__idat != 0: self.fp.read(self.__idat) while True: self.fp.read(4) # CRC try: cid, pos, length = self.png.read() except (struct.error, SyntaxError): break if cid == b"IEND": break elif cid == b"fcTL" and self.is_animated: # start of the next frame, stop reading self.__prepare_idat = 0 self.png.push(cid, pos, length) break try: self.png.call(cid, pos, length) except UnicodeDecodeError: break except EOFError: if cid == b"fdAT": length -= 4 try: ImageFile._safe_read(self.fp, length) except OSError as e: if ImageFile.LOAD_TRUNCATED_IMAGES: break else: raise e except AttributeError: logger.debug("%r %s %s (unknown)", cid, pos, length) s = ImageFile._safe_read(self.fp, length) if cid[1:2].islower(): self.private_chunks.append((cid, s, True)) self._text = self.png.im_text if not self.is_animated: self.png.close() self.png = None else: if self._prev_im and self.blend_op == Blend.OP_OVER: updated = self._crop(self.im, self.dispose_extent) if self.im.mode == "RGB" and "transparency" in self.info: mask = updated.convert_transparent( "RGBA", self.info["transparency"] ) else: if self.im.mode == "P" and "transparency" in self.info: t = self.info["transparency"] if isinstance(t, bytes): updated.putpalettealphas(t) elif isinstance(t, int): updated.putpalettealpha(t) mask = updated.convert("RGBA") self._prev_im.paste(updated, self.dispose_extent, mask) self.im = self._prev_im
def _getexif(self) -> dict[int, Any] | None: if "exif" not in self.info: self.load() if "exif" not in self.info and "Raw profile type exif" not in self.info: return None return self.getexif()._get_merged_dict()
[docs] def getexif(self) -> Image.Exif: if "exif" not in self.info: self.load() return super().getexif()
# -------------------------------------------------------------------- # PNG writer _OUTMODES = { # supported PIL modes, and corresponding rawmode, bit depth and color type "1": ("1", b"\x01", b"\x00"), "L;1": ("L;1", b"\x01", b"\x00"), "L;2": ("L;2", b"\x02", b"\x00"), "L;4": ("L;4", b"\x04", b"\x00"), "L": ("L", b"\x08", b"\x00"), "LA": ("LA", b"\x08", b"\x04"), "I": ("I;16B", b"\x10", b"\x00"), "I;16": ("I;16B", b"\x10", b"\x00"), "I;16B": ("I;16B", b"\x10", b"\x00"), "P;1": ("P;1", b"\x01", b"\x03"), "P;2": ("P;2", b"\x02", b"\x03"), "P;4": ("P;4", b"\x04", b"\x03"), "P": ("P", b"\x08", b"\x03"), "RGB": ("RGB", b"\x08", b"\x02"), "RGBA": ("RGBA", b"\x08", b"\x06"), }
[docs] def putchunk(fp: IO[bytes], cid: bytes, *data: bytes) -> None: """Write a PNG chunk (including CRC field)""" byte_data = b"".join(data) fp.write(o32(len(byte_data)) + cid) fp.write(byte_data) crc = _crc32(byte_data, _crc32(cid)) fp.write(o32(crc))
class _idat: # wrap output from the encoder in IDAT chunks def __init__(self, fp: IO[bytes], chunk: Callable[..., None]) -> None: self.fp = fp self.chunk = chunk def write(self, data: bytes) -> None: self.chunk(self.fp, b"IDAT", data) class _fdat: # wrap encoder output in fdAT chunks def __init__(self, fp: IO[bytes], chunk: Callable[..., None], seq_num: int) -> None: self.fp = fp self.chunk = chunk self.seq_num = seq_num def write(self, data: bytes) -> None: self.chunk(self.fp, b"fdAT", o32(self.seq_num), data) self.seq_num += 1 class _Frame(NamedTuple): im: Image.Image bbox: tuple[int, int, int, int] | None encoderinfo: dict[str, Any] def _write_multiple_frames( im: Image.Image, fp: IO[bytes], chunk: Callable[..., None], mode: str, rawmode: str, default_image: Image.Image | None, append_images: list[Image.Image], ) -> Image.Image | None: duration = im.encoderinfo.get("duration") loop = im.encoderinfo.get("loop", im.info.get("loop", 0)) disposal = im.encoderinfo.get("disposal", im.info.get("disposal", Disposal.OP_NONE)) blend = im.encoderinfo.get("blend", im.info.get("blend", Blend.OP_SOURCE)) if default_image: chain = itertools.chain(append_images) else: chain = itertools.chain([im], append_images) im_frames: list[_Frame] = [] frame_count = 0 for im_seq in chain: for im_frame in ImageSequence.Iterator(im_seq): if im_frame.mode == mode: im_frame = im_frame.copy() else: im_frame = im_frame.convert(mode) encoderinfo = im.encoderinfo.copy() if isinstance(duration, (list, tuple)): encoderinfo["duration"] = duration[frame_count] elif duration is None and "duration" in im_frame.info: encoderinfo["duration"] = im_frame.info["duration"] if isinstance(disposal, (list, tuple)): encoderinfo["disposal"] = disposal[frame_count] if isinstance(blend, (list, tuple)): encoderinfo["blend"] = blend[frame_count] frame_count += 1 if im_frames: previous = im_frames[-1] prev_disposal = previous.encoderinfo.get("disposal") prev_blend = previous.encoderinfo.get("blend") if prev_disposal == Disposal.OP_PREVIOUS and len(im_frames) < 2: prev_disposal = Disposal.OP_BACKGROUND if prev_disposal == Disposal.OP_BACKGROUND: base_im = previous.im.copy() dispose = Image.core.fill("RGBA", im.size, (0, 0, 0, 0)) bbox = previous.bbox if bbox: dispose = dispose.crop(bbox) else: bbox = (0, 0) + im.size base_im.paste(dispose, bbox) elif prev_disposal == Disposal.OP_PREVIOUS: base_im = im_frames[-2].im else: base_im = previous.im delta = ImageChops.subtract_modulo( im_frame.convert("RGBA"), base_im.convert("RGBA") ) bbox = delta.getbbox(alpha_only=False) if ( not bbox and prev_disposal == encoderinfo.get("disposal") and prev_blend == encoderinfo.get("blend") and "duration" in encoderinfo ): previous.encoderinfo["duration"] += encoderinfo["duration"] continue else: bbox = None im_frames.append(_Frame(im_frame, bbox, encoderinfo)) if len(im_frames) == 1 and not default_image: return im_frames[0].im # animation control chunk( fp, b"acTL", o32(len(im_frames)), # 0: num_frames o32(loop), # 4: num_plays ) # default image IDAT (if it exists) if default_image: if im.mode != mode: im = im.convert(mode) ImageFile._save( im, cast(IO[bytes], _idat(fp, chunk)), [ImageFile._Tile("zip", (0, 0) + im.size, 0, rawmode)], ) seq_num = 0 for frame, frame_data in enumerate(im_frames): im_frame = frame_data.im if not frame_data.bbox: bbox = (0, 0) + im_frame.size else: bbox = frame_data.bbox im_frame = im_frame.crop(bbox) size = im_frame.size encoderinfo = frame_data.encoderinfo frame_duration = int(round(encoderinfo.get("duration", 0))) frame_disposal = encoderinfo.get("disposal", disposal) frame_blend = encoderinfo.get("blend", blend) # frame control chunk( fp, b"fcTL", o32(seq_num), # sequence_number o32(size[0]), # width o32(size[1]), # height o32(bbox[0]), # x_offset o32(bbox[1]), # y_offset o16(frame_duration), # delay_numerator o16(1000), # delay_denominator o8(frame_disposal), # dispose_op o8(frame_blend), # blend_op ) seq_num += 1 # frame data if frame == 0 and not default_image: # first frame must be in IDAT chunks for backwards compatibility ImageFile._save( im_frame, cast(IO[bytes], _idat(fp, chunk)), [ImageFile._Tile("zip", (0, 0) + im_frame.size, 0, rawmode)], ) else: fdat_chunks = _fdat(fp, chunk, seq_num) ImageFile._save( im_frame, cast(IO[bytes], fdat_chunks), [ImageFile._Tile("zip", (0, 0) + im_frame.size, 0, rawmode)], ) seq_num = fdat_chunks.seq_num return None def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: _save(im, fp, filename, save_all=True) def _save( im: Image.Image, fp: IO[bytes], filename: str | bytes, chunk: Callable[..., None] = putchunk, save_all: bool = False, ) -> None: # save an image to disk (called by the save method) if save_all: default_image = im.encoderinfo.get( "default_image", im.info.get("default_image") ) modes = set() sizes = set() append_images = im.encoderinfo.get("append_images", []) for im_seq in itertools.chain([im], append_images): for im_frame in ImageSequence.Iterator(im_seq): modes.add(im_frame.mode) sizes.add(im_frame.size) for mode in ("RGBA", "RGB", "P"): if mode in modes: break else: mode = modes.pop() size = tuple(max(frame_size[i] for frame_size in sizes) for i in range(2)) else: size = im.size mode = im.mode outmode = mode if mode == "P": # # attempt to minimize storage requirements for palette images if "bits" in im.encoderinfo: # number of bits specified by user colors = min(1 << im.encoderinfo["bits"], 256) else: # check palette contents if im.palette: colors = max(min(len(im.palette.getdata()[1]) // 3, 256), 1) else: colors = 256 if colors <= 16: if colors <= 2: bits = 1 elif colors <= 4: bits = 2 else: bits = 4 outmode += f";{bits}" # encoder options im.encoderconfig = ( im.encoderinfo.get("optimize", False), im.encoderinfo.get("compress_level", -1), im.encoderinfo.get("compress_type", -1), im.encoderinfo.get("dictionary", b""), ) # get the corresponding PNG mode try: rawmode, bit_depth, color_type = _OUTMODES[outmode] except KeyError as e: msg = f"cannot write mode {mode} as PNG" raise OSError(msg) from e # # write minimal PNG file fp.write(_MAGIC) chunk( fp, b"IHDR", o32(size[0]), # 0: size o32(size[1]), bit_depth, color_type, b"\0", # 10: compression b"\0", # 11: filter category b"\0", # 12: interlace flag ) chunks = [b"cHRM", b"cICP", b"gAMA", b"sBIT", b"sRGB", b"tIME"] icc = im.encoderinfo.get("icc_profile", im.info.get("icc_profile")) if icc: # ICC profile # according to PNG spec, the iCCP chunk contains: # Profile name 1-79 bytes (character string) # Null separator 1 byte (null character) # Compression method 1 byte (0) # Compressed profile n bytes (zlib with deflate compression) name = b"ICC Profile" data = name + b"\0\0" + zlib.compress(icc) chunk(fp, b"iCCP", data) # You must either have sRGB or iCCP. # Disallow sRGB chunks when an iCCP-chunk has been emitted. chunks.remove(b"sRGB") info = im.encoderinfo.get("pnginfo") if info: chunks_multiple_allowed = [b"sPLT", b"iTXt", b"tEXt", b"zTXt"] for info_chunk in info.chunks: cid, data = info_chunk[:2] if cid in chunks: chunks.remove(cid) chunk(fp, cid, data) elif cid in chunks_multiple_allowed: chunk(fp, cid, data) elif cid[1:2].islower(): # Private chunk after_idat = len(info_chunk) == 3 and info_chunk[2] if not after_idat: chunk(fp, cid, data) if im.mode == "P": palette_byte_number = colors * 3 palette_bytes = im.im.getpalette("RGB")[:palette_byte_number] while len(palette_bytes) < palette_byte_number: palette_bytes += b"\0" chunk(fp, b"PLTE", palette_bytes) transparency = im.encoderinfo.get("transparency", im.info.get("transparency", None)) if transparency or transparency == 0: if im.mode == "P": # limit to actual palette size alpha_bytes = colors if isinstance(transparency, bytes): chunk(fp, b"tRNS", transparency[:alpha_bytes]) else: transparency = max(0, min(255, transparency)) alpha = b"\xff" * transparency + b"\0" chunk(fp, b"tRNS", alpha[:alpha_bytes]) elif im.mode in ("1", "L", "I", "I;16"): transparency = max(0, min(65535, transparency)) chunk(fp, b"tRNS", o16(transparency)) elif im.mode == "RGB": red, green, blue = transparency chunk(fp, b"tRNS", o16(red) + o16(green) + o16(blue)) else: if "transparency" in im.encoderinfo: # don't bother with transparency if it's an RGBA # and it's in the info dict. It's probably just stale. msg = "cannot use transparency for this mode" raise OSError(msg) else: if im.mode == "P" and im.im.getpalettemode() == "RGBA": alpha = im.im.getpalette("RGBA", "A") alpha_bytes = colors chunk(fp, b"tRNS", alpha[:alpha_bytes]) dpi = im.encoderinfo.get("dpi") if dpi: chunk( fp, b"pHYs", o32(int(dpi[0] / 0.0254 + 0.5)), o32(int(dpi[1] / 0.0254 + 0.5)), b"\x01", ) if info: chunks = [b"bKGD", b"hIST"] for info_chunk in info.chunks: cid, data = info_chunk[:2] if cid in chunks: chunks.remove(cid) chunk(fp, cid, data) exif = im.encoderinfo.get("exif") if exif: if isinstance(exif, Image.Exif): exif = exif.tobytes(8) if exif.startswith(b"Exif\x00\x00"): exif = exif[6:] chunk(fp, b"eXIf", exif) single_im: Image.Image | None = im if save_all: single_im = _write_multiple_frames( im, fp, chunk, mode, rawmode, default_image, append_images ) if single_im: ImageFile._save( single_im, cast(IO[bytes], _idat(fp, chunk)), [ImageFile._Tile("zip", (0, 0) + single_im.size, 0, rawmode)], ) if info: for info_chunk in info.chunks: cid, data = info_chunk[:2] if cid[1:2].islower(): # Private chunk after_idat = len(info_chunk) == 3 and info_chunk[2] if after_idat: chunk(fp, cid, data) chunk(fp, b"IEND", b"") if hasattr(fp, "flush"): fp.flush() # -------------------------------------------------------------------- # PNG chunk converter
[docs] def getchunks(im: Image.Image, **params: Any) -> list[tuple[bytes, bytes, bytes]]: """Return a list of PNG chunks representing this image.""" from io import BytesIO chunks = [] def append(fp: IO[bytes], cid: bytes, *data: bytes) -> None: byte_data = b"".join(data) crc = o32(_crc32(byte_data, _crc32(cid))) chunks.append((cid, byte_data, crc)) fp = BytesIO() try: im.encoderinfo = params _save(im, fp, "", append) finally: del im.encoderinfo return chunks
# -------------------------------------------------------------------- # Registry Image.register_open(PngImageFile.format, PngImageFile, _accept) Image.register_save(PngImageFile.format, _save) Image.register_save_all(PngImageFile.format, _save_all) Image.register_extensions(PngImageFile.format, [".png", ".apng"]) Image.register_mime(PngImageFile.format, "image/png")