mirror of
https://github.com/ggerganov/llama.cpp.git
synced 2025-01-13 12:10:18 +00:00
3a14e00366
* gguf-py : use classes for quants * convert_hf : simplify internal quantization type selection * gguf-py : fix flake8 lint * gguf-py : fix BF16 numpy view type * gguf-py : remove LlamaFileTypeMap Too specific to 'llama.cpp', and would be a maintenance burden to keep up to date. * gguf-py : add generic quantize and dequantize functions The quant classes no longer need to be known, only the target or the source type, for 'quantize' and 'dequantize', respectively.
214 lines
7.9 KiB
Python
214 lines
7.9 KiB
Python
from __future__ import annotations
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Callable, Sequence
|
|
|
|
from numpy.typing import DTypeLike
|
|
|
|
from .constants import GGML_QUANT_SIZES, GGMLQuantizationType
|
|
from .lazy import LazyNumpyTensor
|
|
|
|
import numpy as np
|
|
|
|
|
|
def quant_shape_to_byte_shape(shape: Sequence[int], quant_type: GGMLQuantizationType) -> tuple[int, ...]:
|
|
block_size, type_size = GGML_QUANT_SIZES[quant_type]
|
|
if shape[-1] % block_size != 0:
|
|
raise ValueError(f"Quantized tensor row size ({shape[-1]}) is not a multiple of {quant_type.name} block size ({block_size})")
|
|
return (*shape[:-1], shape[-1] // block_size * type_size)
|
|
|
|
|
|
def quant_shape_from_byte_shape(shape: Sequence[int], quant_type: GGMLQuantizationType) -> tuple[int, ...]:
|
|
block_size, type_size = GGML_QUANT_SIZES[quant_type]
|
|
if shape[-1] % type_size != 0:
|
|
raise ValueError(f"Quantized tensor bytes per row ({shape[-1]}) is not a multiple of {quant_type.name} type size ({type_size})")
|
|
return (*shape[:-1], shape[-1] // type_size * block_size)
|
|
|
|
|
|
# This is faster than np.vectorize and np.apply_along_axis because it works on more than one row at a time
|
|
def _apply_over_grouped_rows(func: Callable[[np.ndarray], np.ndarray], arr: np.ndarray, otype: DTypeLike, oshape: tuple[int, ...]) -> np.ndarray:
|
|
rows = arr.reshape((-1, arr.shape[-1]))
|
|
osize = 1
|
|
for dim in oshape:
|
|
osize *= dim
|
|
out = np.empty(shape=osize, dtype=otype)
|
|
# compute over groups of 16 rows (arbitrary, but seems good for performance)
|
|
n_groups = (rows.shape[0] // 16) or 1
|
|
np.concatenate([func(group).ravel() for group in np.array_split(rows, n_groups)], axis=0, out=out)
|
|
return out.reshape(oshape)
|
|
|
|
|
|
# round away from zero
|
|
# ref: https://stackoverflow.com/a/59143326/22827863
|
|
def np_roundf(n: np.ndarray) -> np.ndarray:
|
|
a = abs(n)
|
|
floored = np.floor(a)
|
|
b = floored + np.floor(2 * (a - floored))
|
|
return np.sign(n) * b
|
|
|
|
|
|
class QuantError(Exception): ...
|
|
|
|
|
|
_type_traits: dict[GGMLQuantizationType, type[__Quant]] = {}
|
|
|
|
|
|
def quantize(data: np.ndarray, qtype: GGMLQuantizationType) -> np.ndarray:
|
|
if qtype == GGMLQuantizationType.F32:
|
|
return data.astype(np.float32, copy=False)
|
|
elif qtype == GGMLQuantizationType.F16:
|
|
return data.astype(np.float16, copy=False)
|
|
elif (q := _type_traits.get(qtype)) is not None:
|
|
return q.quantize(data)
|
|
else:
|
|
raise NotImplementedError(f"Quantization for {qtype.name} is not yet implemented")
|
|
|
|
|
|
def dequantize(data: np.ndarray, qtype: GGMLQuantizationType) -> np.ndarray:
|
|
if qtype == GGMLQuantizationType.F32 or qtype == GGMLQuantizationType.F16:
|
|
return data.astype(np.float32, copy=False)
|
|
elif (q := _type_traits.get(qtype)) is not None:
|
|
return q.dequantize(data)
|
|
else:
|
|
raise NotImplementedError(f"Dequantization for {qtype.name} is not yet implemented")
|
|
|
|
|
|
class __Quant(ABC):
|
|
qtype: GGMLQuantizationType
|
|
block_size: int
|
|
type_size: int
|
|
|
|
def __init__(self):
|
|
return TypeError("Quant conversion classes can't have instances")
|
|
|
|
def __init_subclass__(cls, qtype: GGMLQuantizationType) -> None:
|
|
cls.qtype = qtype
|
|
cls.block_size, cls.type_size = GGML_QUANT_SIZES[qtype]
|
|
cls.__quantize_lazy = LazyNumpyTensor._wrap_fn(
|
|
cls.__quantize_array,
|
|
meta_noop=(np.uint8, cls.__shape_to_bytes)
|
|
)
|
|
cls.__dequantize_lazy = LazyNumpyTensor._wrap_fn(
|
|
cls.__dequantize_array,
|
|
meta_noop=(np.float32, cls.__shape_from_bytes)
|
|
)
|
|
assert qtype not in _type_traits
|
|
_type_traits[qtype] = cls
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def quantize_blocks(cls, blocks: np.ndarray) -> np.ndarray:
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def dequantize_blocks(cls, blocks: np.ndarray) -> np.ndarray:
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def quantize_rows(cls, rows: np.ndarray) -> np.ndarray:
|
|
rows = rows.astype(np.float32, copy=False)
|
|
shape = rows.shape
|
|
n_blocks = rows.size // cls.block_size
|
|
blocks = rows.reshape((n_blocks, cls.block_size))
|
|
blocks = cls.quantize_blocks(blocks)
|
|
assert blocks.dtype == np.uint8
|
|
assert blocks.shape[-1] == cls.type_size
|
|
return blocks.reshape(cls.__shape_to_bytes(shape))
|
|
|
|
@classmethod
|
|
def dequantize_rows(cls, rows: np.ndarray) -> np.ndarray:
|
|
rows = rows.view(np.uint8)
|
|
shape = rows.shape
|
|
n_blocks = rows.size // cls.type_size
|
|
blocks = rows.reshape((n_blocks, cls.type_size))
|
|
blocks = cls.dequantize_blocks(blocks)
|
|
assert blocks.dtype == np.float32
|
|
assert blocks.shape[-1] == cls.block_size
|
|
return blocks.reshape(cls.__shape_from_bytes(shape))
|
|
|
|
@classmethod
|
|
def __shape_to_bytes(cls, shape: Sequence[int]):
|
|
return quant_shape_to_byte_shape(shape, cls.qtype)
|
|
|
|
@classmethod
|
|
def __shape_from_bytes(cls, shape: Sequence[int]):
|
|
return quant_shape_from_byte_shape(shape, cls.qtype)
|
|
|
|
@classmethod
|
|
def __quantize_array(cls, array: np.ndarray) -> np.ndarray:
|
|
return _apply_over_grouped_rows(cls.quantize_rows, arr=array, otype=np.uint8, oshape=cls.__shape_to_bytes(array.shape))
|
|
|
|
@classmethod
|
|
def __dequantize_array(cls, array: np.ndarray) -> np.ndarray:
|
|
return _apply_over_grouped_rows(cls.dequantize_rows, arr=array, otype=np.float32, oshape=cls.__shape_from_bytes(array.shape))
|
|
|
|
@classmethod
|
|
def __quantize_lazy(cls, lazy_tensor: LazyNumpyTensor, /) -> Any:
|
|
pass
|
|
|
|
@classmethod
|
|
def __dequantize_lazy(cls, lazy_tensor: LazyNumpyTensor, /) -> Any:
|
|
pass
|
|
|
|
@classmethod
|
|
def can_quantize(cls, tensor: np.ndarray | LazyNumpyTensor) -> bool:
|
|
return tensor.shape[-1] % cls.block_size == 0
|
|
|
|
@classmethod
|
|
def quantize(cls, tensor: np.ndarray | LazyNumpyTensor) -> np.ndarray:
|
|
if not cls.can_quantize(tensor):
|
|
raise QuantError(f"Can't quantize tensor with shape {tensor.shape} to {cls.qtype.name}")
|
|
if isinstance(tensor, LazyNumpyTensor):
|
|
return cls.__quantize_lazy(tensor)
|
|
else:
|
|
return cls.__quantize_array(tensor)
|
|
|
|
@classmethod
|
|
def dequantize(cls, tensor: np.ndarray | LazyNumpyTensor) -> np.ndarray:
|
|
if isinstance(tensor, LazyNumpyTensor):
|
|
return cls.__dequantize_lazy(tensor)
|
|
else:
|
|
return cls.__dequantize_array(tensor)
|
|
|
|
|
|
class BF16(__Quant, qtype=GGMLQuantizationType.BF16):
|
|
@classmethod
|
|
# same as ggml_compute_fp32_to_bf16 in ggml-impl.h
|
|
def quantize_blocks(cls, blocks: np.ndarray) -> np.ndarray:
|
|
n = blocks.view(np.uint32)
|
|
# force nan to quiet
|
|
n = np.where((n & 0x7fffffff) > 0x7f800000, (n & np.uint32(0xffff0000)) | np.uint32(64 << 16), n)
|
|
# round to nearest even
|
|
n = (np.uint64(n) + (0x7fff + ((n >> 16) & 1))) >> 16
|
|
return n.astype(np.uint16).view(np.uint8)
|
|
|
|
@classmethod
|
|
def dequantize_blocks(cls, blocks: np.ndarray) -> np.ndarray:
|
|
return (blocks.view(np.int16).astype(np.int32) << 16).view(np.float32)
|
|
|
|
|
|
class Q8_0(__Quant, qtype=GGMLQuantizationType.Q8_0):
|
|
@classmethod
|
|
# Implementation of Q8_0 with bit-exact same results as reference implementation in ggml-quants.c
|
|
def quantize_blocks(cls, blocks: np.ndarray) -> np.ndarray:
|
|
|
|
d = abs(blocks).max(axis=1, keepdims=True) / 127
|
|
with np.errstate(divide="ignore"):
|
|
id = np.where(d == 0, 0, 1 / d)
|
|
qs = np_roundf(blocks * id)
|
|
|
|
# (n_blocks, 2)
|
|
d = d.astype(np.float16).view(np.uint8)
|
|
# (n_blocks, block_size)
|
|
qs = qs.astype(np.int8).view(np.uint8)
|
|
|
|
return np.concatenate([d, qs], axis=1)
|
|
|
|
@classmethod
|
|
def dequantize_blocks(cls, blocks: np.ndarray) -> np.ndarray:
|
|
d, x = np.split(blocks, [2], axis=1)
|
|
d = d.view(np.float16).astype(np.float32)
|
|
x = x.view(np.int8).astype(np.float32)
|
|
|
|
return (x * d)
|