mirror of
https://github.com/ggerganov/llama.cpp.git
synced 2025-01-11 19:21:46 +00:00
3fd62a6b1c
* py : type-check all Python scripts with Pyright * server-tests : use trailing slash in openai base_url * server-tests : add more type annotations * server-tests : strip "chat" from base_url in oai_chat_completions * server-tests : model metadata is a dict * ci : disable pip cache in type-check workflow The cache is not shared between branches, and it's 250MB in size, so it would become quite a big part of the 10GB cache limit of the repo. * py : fix new type errors from master branch * tests : fix test-tokenizer-random.py Apparently, gcc applies optimisations even when pre-processing, which confuses pycparser. * ci : only show warnings and errors in python type-check The "information" level otherwise has entries from 'examples/pydantic_models_to_grammar.py', which could be confusing for someone trying to figure out what failed, considering that these messages can safely be ignored even though they look like errors.
318 lines
12 KiB
Python
318 lines
12 KiB
Python
#
|
|
# GGUF file reading/modification support. For API usage information,
|
|
# please see the files scripts/ for some fairly simple examples.
|
|
#
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from collections import OrderedDict
|
|
from typing import Any, Literal, NamedTuple, TypeVar, Union
|
|
|
|
import numpy as np
|
|
import numpy.typing as npt
|
|
|
|
from .quants import quant_shape_to_byte_shape
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Allow running file in package as a script.
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from gguf.constants import (
|
|
GGML_QUANT_SIZES,
|
|
GGUF_DEFAULT_ALIGNMENT,
|
|
GGUF_MAGIC,
|
|
GGUF_VERSION,
|
|
GGMLQuantizationType,
|
|
GGUFValueType,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
READER_SUPPORTED_VERSIONS = [2, GGUF_VERSION]
|
|
|
|
|
|
class ReaderField(NamedTuple):
|
|
# Offset to start of this field.
|
|
offset: int
|
|
|
|
# Name of the field (not necessarily from file data).
|
|
name: str
|
|
|
|
# Data parts. Some types have multiple components, such as strings
|
|
# that consist of a length followed by the string data.
|
|
parts: list[npt.NDArray[Any]] = []
|
|
|
|
# Indexes into parts that we can call the actual data. For example
|
|
# an array of strings will be populated with indexes to the actual
|
|
# string data.
|
|
data: list[int] = [-1]
|
|
|
|
types: list[GGUFValueType] = []
|
|
|
|
|
|
class ReaderTensor(NamedTuple):
|
|
name: str
|
|
tensor_type: GGMLQuantizationType
|
|
shape: npt.NDArray[np.uint32]
|
|
n_elements: int
|
|
n_bytes: int
|
|
data_offset: int
|
|
data: npt.NDArray[Any]
|
|
field: ReaderField
|
|
|
|
|
|
class GGUFReader:
|
|
# I - same as host, S - swapped
|
|
byte_order: Literal['I', 'S'] = 'I'
|
|
alignment: int = GGUF_DEFAULT_ALIGNMENT
|
|
data_offset: int
|
|
|
|
# Note: Internal helper, API may change.
|
|
gguf_scalar_to_np: dict[GGUFValueType, type[np.generic]] = {
|
|
GGUFValueType.UINT8: np.uint8,
|
|
GGUFValueType.INT8: np.int8,
|
|
GGUFValueType.UINT16: np.uint16,
|
|
GGUFValueType.INT16: np.int16,
|
|
GGUFValueType.UINT32: np.uint32,
|
|
GGUFValueType.INT32: np.int32,
|
|
GGUFValueType.FLOAT32: np.float32,
|
|
GGUFValueType.UINT64: np.uint64,
|
|
GGUFValueType.INT64: np.int64,
|
|
GGUFValueType.FLOAT64: np.float64,
|
|
GGUFValueType.BOOL: np.bool_,
|
|
}
|
|
|
|
def __init__(self, path: os.PathLike[str] | str, mode: Literal['r', 'r+', 'c'] = 'r'):
|
|
self.data = np.memmap(path, mode = mode)
|
|
offs = 0
|
|
|
|
# Check for GGUF magic
|
|
if self._get(offs, np.uint32, override_order = '<')[0] != GGUF_MAGIC:
|
|
raise ValueError('GGUF magic invalid')
|
|
offs += 4
|
|
|
|
# Check GGUF version
|
|
temp_version = self._get(offs, np.uint32)
|
|
if temp_version[0] & 65535 == 0:
|
|
# If we get 0 here that means it's (probably) a GGUF file created for
|
|
# the opposite byte order of the machine this script is running on.
|
|
self.byte_order = 'S'
|
|
temp_version = temp_version.newbyteorder(self.byte_order)
|
|
version = temp_version[0]
|
|
if version not in READER_SUPPORTED_VERSIONS:
|
|
raise ValueError(f'Sorry, file appears to be version {version} which we cannot handle')
|
|
self.fields: OrderedDict[str, ReaderField] = OrderedDict()
|
|
self.tensors: list[ReaderTensor] = []
|
|
offs += self._push_field(ReaderField(offs, 'GGUF.version', [temp_version], [0], [GGUFValueType.UINT32]))
|
|
|
|
# Check tensor count and kv count
|
|
temp_counts = self._get(offs, np.uint64, 2)
|
|
offs += self._push_field(ReaderField(offs, 'GGUF.tensor_count', [temp_counts[:1]], [0], [GGUFValueType.UINT64]))
|
|
offs += self._push_field(ReaderField(offs, 'GGUF.kv_count', [temp_counts[1:]], [0], [GGUFValueType.UINT64]))
|
|
tensor_count, kv_count = temp_counts
|
|
offs = self._build_fields(offs, kv_count)
|
|
|
|
# Build Tensor Info Fields
|
|
offs, tensors_fields = self._build_tensor_info(offs, tensor_count)
|
|
new_align = self.fields.get('general.alignment')
|
|
if new_align is not None:
|
|
if new_align.types != [GGUFValueType.UINT32]:
|
|
raise ValueError('Bad type for general.alignment field')
|
|
self.alignment = new_align.parts[-1][0]
|
|
padding = offs % self.alignment
|
|
if padding != 0:
|
|
offs += self.alignment - padding
|
|
self.data_offset = offs
|
|
self._build_tensors(offs, tensors_fields)
|
|
|
|
_DT = TypeVar('_DT', bound = npt.DTypeLike)
|
|
|
|
# Fetch a key/value metadata field by key.
|
|
def get_field(self, key: str) -> Union[ReaderField, None]:
|
|
return self.fields.get(key, None)
|
|
|
|
# Fetch a tensor from the list by index.
|
|
def get_tensor(self, idx: int) -> ReaderTensor:
|
|
return self.tensors[idx]
|
|
|
|
def _get(
|
|
self, offset: int, dtype: npt.DTypeLike, count: int = 1, override_order: None | Literal['I', 'S', '<'] = None,
|
|
) -> npt.NDArray[Any]:
|
|
count = int(count)
|
|
itemsize = int(np.empty([], dtype = dtype).itemsize)
|
|
end_offs = offset + itemsize * count
|
|
return (
|
|
self.data[offset:end_offs]
|
|
.view(dtype = dtype)[:count]
|
|
.newbyteorder(override_order or self.byte_order)
|
|
)
|
|
|
|
def _push_field(self, field: ReaderField, skip_sum: bool = False) -> int:
|
|
if field.name in self.fields:
|
|
# TODO: add option to generate error on duplicate keys
|
|
# raise KeyError(f'Duplicate {field.name} already in list at offset {field.offset}')
|
|
|
|
logger.warning(f'Duplicate key {field.name} at offset {field.offset}')
|
|
self.fields[field.name + '_{}'.format(field.offset)] = field
|
|
else:
|
|
self.fields[field.name] = field
|
|
return 0 if skip_sum else sum(int(part.nbytes) for part in field.parts)
|
|
|
|
def _get_str(self, offset: int) -> tuple[npt.NDArray[np.uint64], npt.NDArray[np.uint8]]:
|
|
slen = self._get(offset, np.uint64)
|
|
return slen, self._get(offset + 8, np.uint8, slen[0])
|
|
|
|
def _get_field_parts(
|
|
self, orig_offs: int, raw_type: int,
|
|
) -> tuple[int, list[npt.NDArray[Any]], list[int], list[GGUFValueType]]:
|
|
offs = orig_offs
|
|
types: list[GGUFValueType] = []
|
|
gtype = GGUFValueType(raw_type)
|
|
types.append(gtype)
|
|
# Handle strings.
|
|
if gtype == GGUFValueType.STRING:
|
|
sparts: list[npt.NDArray[Any]] = list(self._get_str(offs))
|
|
size = sum(int(part.nbytes) for part in sparts)
|
|
return size, sparts, [1], types
|
|
# Check if it's a simple scalar type.
|
|
nptype = self.gguf_scalar_to_np.get(gtype)
|
|
if nptype is not None:
|
|
val = self._get(offs, nptype)
|
|
return int(val.nbytes), [val], [0], types
|
|
# Handle arrays.
|
|
if gtype == GGUFValueType.ARRAY:
|
|
raw_itype = self._get(offs, np.uint32)
|
|
offs += int(raw_itype.nbytes)
|
|
alen = self._get(offs, np.uint64)
|
|
offs += int(alen.nbytes)
|
|
aparts: list[npt.NDArray[Any]] = [raw_itype, alen]
|
|
data_idxs: list[int] = []
|
|
for idx in range(alen[0]):
|
|
curr_size, curr_parts, curr_idxs, curr_types = self._get_field_parts(offs, raw_itype[0])
|
|
if idx == 0:
|
|
types += curr_types
|
|
idxs_offs = len(aparts)
|
|
aparts += curr_parts
|
|
data_idxs += (idx + idxs_offs for idx in curr_idxs)
|
|
offs += curr_size
|
|
return offs - orig_offs, aparts, data_idxs, types
|
|
# We can't deal with this one.
|
|
raise ValueError('Unknown/unhandled field type {gtype}')
|
|
|
|
def _get_tensor_info_field(self, orig_offs: int) -> ReaderField:
|
|
offs = orig_offs
|
|
|
|
# Get Tensor Name
|
|
name_len, name_data = self._get_str(offs)
|
|
offs += int(name_len.nbytes + name_data.nbytes)
|
|
|
|
# Get Tensor Dimensions Count
|
|
n_dims = self._get(offs, np.uint32)
|
|
offs += int(n_dims.nbytes)
|
|
|
|
# Get Tensor Dimension Array
|
|
dims = self._get(offs, np.uint64, n_dims[0])
|
|
offs += int(dims.nbytes)
|
|
|
|
# Get Tensor Encoding Scheme Type
|
|
raw_dtype = self._get(offs, np.uint32)
|
|
offs += int(raw_dtype.nbytes)
|
|
|
|
# Get Tensor Offset
|
|
offset_tensor = self._get(offs, np.uint64)
|
|
offs += int(offset_tensor.nbytes)
|
|
|
|
return ReaderField(
|
|
orig_offs,
|
|
str(bytes(name_data), encoding = 'utf-8'),
|
|
[name_len, name_data, n_dims, dims, raw_dtype, offset_tensor],
|
|
[1, 3, 4, 5],
|
|
)
|
|
|
|
def _build_fields(self, offs: int, count: int) -> int:
|
|
for _ in range(count):
|
|
orig_offs = offs
|
|
kv_klen, kv_kdata = self._get_str(offs)
|
|
offs += int(kv_klen.nbytes + kv_kdata.nbytes)
|
|
raw_kv_type = self._get(offs, np.uint32)
|
|
offs += int(raw_kv_type.nbytes)
|
|
parts: list[npt.NDArray[Any]] = [kv_klen, kv_kdata, raw_kv_type]
|
|
idxs_offs = len(parts)
|
|
field_size, field_parts, field_idxs, field_types = self._get_field_parts(offs, raw_kv_type[0])
|
|
parts += field_parts
|
|
self._push_field(ReaderField(
|
|
orig_offs,
|
|
str(bytes(kv_kdata), encoding = 'utf-8'),
|
|
parts,
|
|
[idx + idxs_offs for idx in field_idxs],
|
|
field_types,
|
|
), skip_sum = True)
|
|
offs += field_size
|
|
return offs
|
|
|
|
def _build_tensor_info(self, offs: int, count: int) -> tuple[int, list[ReaderField]]:
|
|
tensor_fields = []
|
|
for _ in range(count):
|
|
field = self._get_tensor_info_field(offs)
|
|
offs += sum(int(part.nbytes) for part in field.parts)
|
|
tensor_fields.append(field)
|
|
return offs, tensor_fields
|
|
|
|
def _build_tensors(self, start_offs: int, fields: list[ReaderField]) -> None:
|
|
tensors = []
|
|
tensor_names = set() # keep track of name to prevent duplicated tensors
|
|
for field in fields:
|
|
_name_len, name_data, _n_dims, dims, raw_dtype, offset_tensor = field.parts
|
|
# check if there's any tensor having same name already in the list
|
|
tensor_name = str(bytes(name_data), encoding = 'utf-8')
|
|
if tensor_name in tensor_names:
|
|
raise ValueError(f'Found duplicated tensor with name {tensor_name}')
|
|
tensor_names.add(tensor_name)
|
|
ggml_type = GGMLQuantizationType(raw_dtype[0])
|
|
n_elems = int(np.prod(dims))
|
|
np_dims = tuple(reversed(dims.tolist()))
|
|
block_size, type_size = GGML_QUANT_SIZES[ggml_type]
|
|
n_bytes = n_elems * type_size // block_size
|
|
data_offs = int(start_offs + offset_tensor[0])
|
|
item_type: npt.DTypeLike
|
|
if ggml_type == GGMLQuantizationType.F16:
|
|
item_count = n_elems
|
|
item_type = np.float16
|
|
elif ggml_type == GGMLQuantizationType.F32:
|
|
item_count = n_elems
|
|
item_type = np.float32
|
|
elif ggml_type == GGMLQuantizationType.F64:
|
|
item_count = n_elems
|
|
item_type = np.float64
|
|
elif ggml_type == GGMLQuantizationType.I8:
|
|
item_count = n_elems
|
|
item_type = np.int8
|
|
elif ggml_type == GGMLQuantizationType.I16:
|
|
item_count = n_elems
|
|
item_type = np.int16
|
|
elif ggml_type == GGMLQuantizationType.I32:
|
|
item_count = n_elems
|
|
item_type = np.int32
|
|
elif ggml_type == GGMLQuantizationType.I64:
|
|
item_count = n_elems
|
|
item_type = np.int64
|
|
else:
|
|
item_count = n_bytes
|
|
item_type = np.uint8
|
|
np_dims = quant_shape_to_byte_shape(np_dims, ggml_type)
|
|
tensors.append(ReaderTensor(
|
|
name = tensor_name,
|
|
tensor_type = ggml_type,
|
|
shape = dims,
|
|
n_elements = n_elems,
|
|
n_bytes = n_bytes,
|
|
data_offset = data_offs,
|
|
data = self._get(data_offs, item_type, item_count).reshape(np_dims),
|
|
field = field,
|
|
))
|
|
self.tensors = tensors
|