mirror of
https://github.com/ggerganov/llama.cpp.git
synced 2025-01-01 06:14:35 +00:00
52fc8705a0
* support splits in convert.py * Support split by size and dry run to write estimated shards/filesizes * Move split functionality to new GGUFManager class * fix improper function signature * tentative push of convert-hf-to-gguf support * resolve merge + SplitArguments for easier parsing * Fix eager tensor memory leak and remove convert.py changes Removed a memory leak caused by unexpected reference retention to eager tensors. Also removed GGUFManager functionality in convert.py in favor of specializing for convert-hf-to-gguf.py. * refactor SplitStrategy to be a deque Instead of having SplitStrategy have a `data` field that is a deque, just have SplitStrategy be a subclass of deque itself. * fix Q8 quantization * remove unnecessary imports in gguf_manager * fix final? merge issue * fix gguf_writer placement and remove comments * oops, actually fix gguf_writer placement * reduce duplicated code from gguf_writer * further simplify GGUFManager * simplify even further and standardize with GGUFWriter * reduce diffs with master * form shards while adding tensors, SHA256 sums agree with master * re-add type hint Co-authored-by: compilade <git@compilade.net> * GGUFWriter compatibility fix Co-authored-by: compilade <git@compilade.net> * Shard dataclass and un-negative dont_add_architecture * type consistency in format_n_bytes_to_str * move kv keys to constants.py * make pathlib explicit * base-1024 bytes to base-1000 * rename GGUFManager to GGUFWriterSplit * Update gguf-py/gguf/constants.py Co-authored-by: compilade <git@compilade.net> * fix convert-hf-to-gguf.py permissions * fix line endings * Update gguf-py/gguf/gguf_writer_split.py Co-authored-by: compilade <git@compilade.net> * convert-hf : restore executable file permission * examples/convert-legacy-llama.py: restore executable file permission * reinstate original gguf package import and fix type annotation * attempt to appease the linter * attempt 2 to appease the linter * attempt 3 to appease the linter * comma consistency * Update convert-hf-to-gguf.py Co-authored-by: compilade <git@compilade.net> * edit cmd line args * use simplification from #7827 * kv/ti data are still wrong * try to refactor kv data (still fails) * fix ti data messiness * tidy up * fix linting * actually make the linter happy * cleanup round 1 * remove SplitStrategy, SplitArguments * appease linter * fix typing and clean up * fix linting * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * progress bar, fix split logic * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * catch oversights * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * swap bar orders * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * compatibility fix * Update gguf-py/gguf/gguf_writer.py Co-authored-by: compilade <git@compilade.net> * Update convert-hf-to-gguf.py Co-authored-by: compilade <git@compilade.net> --------- Co-authored-by: Brian <mofosyne@gmail.com> Co-authored-by: compilade <git@compilade.net>
731 lines
28 KiB
Python
731 lines
28 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import struct
|
|
import tempfile
|
|
from dataclasses import dataclass
|
|
from enum import Enum, auto
|
|
from pathlib import Path
|
|
from io import BufferedWriter
|
|
from typing import IO, Any, Sequence, Mapping
|
|
from string import ascii_letters, digits
|
|
|
|
import numpy as np
|
|
|
|
from .constants import (
|
|
GGUF_DEFAULT_ALIGNMENT,
|
|
GGUF_MAGIC,
|
|
GGUF_VERSION,
|
|
GGMLQuantizationType,
|
|
GGUFEndian,
|
|
GGUFValueType,
|
|
Keys,
|
|
RopeScalingType,
|
|
PoolingType,
|
|
TokenType,
|
|
)
|
|
|
|
from .quants import quant_shape_from_byte_shape
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
SHARD_NAME_FORMAT = "{:s}-{:05d}-of-{:05d}.gguf"
|
|
|
|
|
|
@dataclass
|
|
class TensorInfo:
|
|
shape: Sequence[int]
|
|
dtype: GGMLQuantizationType
|
|
nbytes: int
|
|
tensor: np.ndarray[Any, Any] | None = None
|
|
|
|
|
|
@dataclass
|
|
class GGUFValue:
|
|
value: Any
|
|
type: GGUFValueType
|
|
|
|
|
|
class WriterState(Enum):
|
|
NO_FILE = auto()
|
|
EMPTY = auto()
|
|
HEADER = auto()
|
|
KV_DATA = auto()
|
|
TI_DATA = auto()
|
|
WEIGHTS = auto()
|
|
|
|
|
|
class GGUFWriter:
|
|
fout: list[BufferedWriter] | None
|
|
path: Path | None
|
|
temp_file: tempfile.SpooledTemporaryFile[bytes] | None
|
|
tensors: list[dict[str, TensorInfo]]
|
|
kv_data: list[dict[str, GGUFValue]]
|
|
state: WriterState
|
|
_simple_value_packing = {
|
|
GGUFValueType.UINT8: "B",
|
|
GGUFValueType.INT8: "b",
|
|
GGUFValueType.UINT16: "H",
|
|
GGUFValueType.INT16: "h",
|
|
GGUFValueType.UINT32: "I",
|
|
GGUFValueType.INT32: "i",
|
|
GGUFValueType.FLOAT32: "f",
|
|
GGUFValueType.UINT64: "Q",
|
|
GGUFValueType.INT64: "q",
|
|
GGUFValueType.FLOAT64: "d",
|
|
GGUFValueType.BOOL: "?",
|
|
}
|
|
|
|
def __init__(
|
|
self, path: os.PathLike[str] | str | None, arch: str, use_temp_file: bool = False, endianess: GGUFEndian = GGUFEndian.LITTLE,
|
|
split_max_tensors: int = 0, split_max_size: int = 0, dry_run: bool = False, small_first_shard: bool = False
|
|
):
|
|
self.fout = None
|
|
self.path = Path(path) if path else None
|
|
self.arch = arch
|
|
self.endianess = endianess
|
|
self.data_alignment = GGUF_DEFAULT_ALIGNMENT
|
|
self.use_temp_file = use_temp_file
|
|
self.temp_file = None
|
|
self.tensors = [{}]
|
|
self.kv_data = [{}]
|
|
self.split_max_tensors = split_max_tensors
|
|
self.split_max_size = split_max_size
|
|
self.dry_run = dry_run
|
|
self.small_first_shard = small_first_shard
|
|
logger.info("gguf: This GGUF file is for {0} Endian only".format(
|
|
"Big" if self.endianess == GGUFEndian.BIG else "Little",
|
|
))
|
|
self.state = WriterState.NO_FILE
|
|
|
|
if self.small_first_shard:
|
|
self.tensors.append({})
|
|
|
|
self.add_architecture()
|
|
|
|
def format_shard_names(self, path: Path) -> list[Path]:
|
|
if len(self.tensors) == 1:
|
|
return [path]
|
|
return [path.with_name(SHARD_NAME_FORMAT.format(path.stem, i + 1, len(self.tensors))) for i in range(len(self.tensors))]
|
|
|
|
def open_output_file(self, path: Path | None = None) -> None:
|
|
if self.state is WriterState.EMPTY and self.fout is not None and (path is None or path == self.path):
|
|
# allow calling this multiple times as long as the path is the same
|
|
return
|
|
if self.state is not WriterState.NO_FILE:
|
|
raise ValueError(f'Expected output file to be not yet opened, got {self.state}')
|
|
|
|
if path is not None:
|
|
self.path = path
|
|
|
|
if self.path is not None:
|
|
filenames = self.print_plan()
|
|
self.fout = [open(filename, "wb") for filename in filenames]
|
|
self.state = WriterState.EMPTY
|
|
|
|
def print_plan(self) -> list[Path]:
|
|
logger.info("Writing the following files:")
|
|
assert self.path is not None
|
|
filenames = self.format_shard_names(self.path)
|
|
assert len(filenames) == len(self.tensors)
|
|
for name, tensors in zip(filenames, self.tensors):
|
|
logger.info(f"{name}: n_tensors = {len(tensors)}, total_size = {GGUFWriter.format_n_bytes_to_str(sum(ti.nbytes for ti in tensors.values()))}")
|
|
|
|
if self.dry_run:
|
|
logger.info("Dry run, not writing files")
|
|
exit()
|
|
|
|
return filenames
|
|
|
|
def add_shard_kv_data(self) -> None:
|
|
if len(self.tensors) == 1:
|
|
return
|
|
|
|
total_tensors = sum(len(t) for t in self.tensors)
|
|
assert self.fout is not None
|
|
total_splits = len(self.fout)
|
|
self.kv_data.extend({} for _ in range(len(self.kv_data), total_splits))
|
|
for i, kv_data in enumerate(self.kv_data):
|
|
kv_data[Keys.Split.LLM_KV_SPLIT_NO] = GGUFValue(i, GGUFValueType.UINT16)
|
|
kv_data[Keys.Split.LLM_KV_SPLIT_COUNT] = GGUFValue(total_splits, GGUFValueType.UINT16)
|
|
kv_data[Keys.Split.LLM_KV_SPLIT_TENSORS_COUNT] = GGUFValue(total_tensors, GGUFValueType.INT32)
|
|
|
|
def write_header_to_file(self, path: Path | None = None) -> None:
|
|
if len(self.tensors) == 1 and (self.split_max_tensors != 0 or self.split_max_size != 0):
|
|
logger.warning("Model fails split requirements, not splitting")
|
|
|
|
self.open_output_file(path)
|
|
|
|
if self.state is not WriterState.EMPTY:
|
|
raise ValueError(f'Expected output file to be empty, got {self.state}')
|
|
|
|
assert self.fout is not None
|
|
assert len(self.fout) == len(self.tensors)
|
|
assert len(self.kv_data) == 1
|
|
|
|
self.add_shard_kv_data()
|
|
|
|
for fout, tensors, kv_data in zip(self.fout, self.tensors, self.kv_data):
|
|
fout.write(self._pack("<I", GGUF_MAGIC, skip_pack_prefix = True))
|
|
fout.write(self._pack("I", GGUF_VERSION))
|
|
fout.write(self._pack("Q", len(tensors)))
|
|
fout.write(self._pack("Q", len(kv_data)))
|
|
fout.flush()
|
|
self.state = WriterState.HEADER
|
|
|
|
def write_kv_data_to_file(self) -> None:
|
|
if self.state is not WriterState.HEADER:
|
|
raise ValueError(f'Expected output file to contain the header, got {self.state}')
|
|
assert self.fout is not None
|
|
|
|
for fout, kv_data in zip(self.fout, self.kv_data):
|
|
kv_bytes = bytearray()
|
|
|
|
for key, val in kv_data.items():
|
|
kv_bytes += self._pack_val(key, GGUFValueType.STRING, add_vtype=False)
|
|
kv_bytes += self._pack_val(val.value, val.type, add_vtype=True)
|
|
|
|
fout.write(kv_bytes)
|
|
|
|
self.flush()
|
|
self.state = WriterState.KV_DATA
|
|
|
|
def write_ti_data_to_file(self) -> None:
|
|
if self.state is not WriterState.KV_DATA:
|
|
raise ValueError(f'Expected output file to contain KV data, got {self.state}')
|
|
assert self.fout is not None
|
|
|
|
for fout, tensors in zip(self.fout, self.tensors):
|
|
ti_data = bytearray()
|
|
offset_tensor = 0
|
|
|
|
for name, ti in tensors.items():
|
|
ti_data += self._pack_val(name, GGUFValueType.STRING, add_vtype=False)
|
|
n_dims = len(ti.shape)
|
|
ti_data += self._pack("I", n_dims)
|
|
for j in range(n_dims):
|
|
ti_data += self._pack("Q", ti.shape[n_dims - 1 - j])
|
|
ti_data += self._pack("I", ti.dtype)
|
|
ti_data += self._pack("Q", offset_tensor)
|
|
offset_tensor += GGUFWriter.ggml_pad(ti.nbytes, self.data_alignment)
|
|
|
|
fout.write(ti_data)
|
|
fout.flush()
|
|
self.state = WriterState.TI_DATA
|
|
|
|
def add_key_value(self, key: str, val: Any, vtype: GGUFValueType) -> None:
|
|
if any(key in kv_data for kv_data in self.kv_data):
|
|
raise ValueError(f'Duplicated key name {key!r}')
|
|
|
|
self.kv_data[0][key] = GGUFValue(value=val, type=vtype)
|
|
|
|
def add_uint8(self, key: str, val: int) -> None:
|
|
self.add_key_value(key,val, GGUFValueType.UINT8)
|
|
|
|
def add_int8(self, key: str, val: int) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.INT8)
|
|
|
|
def add_uint16(self, key: str, val: int) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.UINT16)
|
|
|
|
def add_int16(self, key: str, val: int) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.INT16)
|
|
|
|
def add_uint32(self, key: str, val: int) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.UINT32)
|
|
|
|
def add_int32(self, key: str, val: int) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.INT32)
|
|
|
|
def add_float32(self, key: str, val: float) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.FLOAT32)
|
|
|
|
def add_uint64(self, key: str, val: int) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.UINT64)
|
|
|
|
def add_int64(self, key: str, val: int) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.INT64)
|
|
|
|
def add_float64(self, key: str, val: float) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.FLOAT64)
|
|
|
|
def add_bool(self, key: str, val: bool) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.BOOL)
|
|
|
|
def add_string(self, key: str, val: str) -> None:
|
|
if not val:
|
|
return
|
|
self.add_key_value(key, val, GGUFValueType.STRING)
|
|
|
|
def add_array(self, key: str, val: Sequence[Any]) -> None:
|
|
self.add_key_value(key, val, GGUFValueType.ARRAY)
|
|
|
|
@staticmethod
|
|
def ggml_pad(x: int, n: int) -> int:
|
|
return ((x + n - 1) // n) * n
|
|
|
|
def add_tensor_info(
|
|
self, name: str, tensor_shape: Sequence[int], tensor_dtype: np.dtype,
|
|
tensor_nbytes: int, raw_dtype: GGMLQuantizationType | None = None,
|
|
) -> None:
|
|
if self.state is not WriterState.NO_FILE:
|
|
raise ValueError(f'Expected output file to be not yet opened, got {self.state}')
|
|
|
|
if any(name in tensors for tensors in self.tensors):
|
|
raise ValueError(f'Duplicated tensor name {name!r}')
|
|
|
|
if raw_dtype is None:
|
|
if tensor_dtype == np.float16:
|
|
dtype = GGMLQuantizationType.F16
|
|
elif tensor_dtype == np.float32:
|
|
dtype = GGMLQuantizationType.F32
|
|
elif tensor_dtype == np.float64:
|
|
dtype = GGMLQuantizationType.F64
|
|
elif tensor_dtype == np.int8:
|
|
dtype = GGMLQuantizationType.I8
|
|
elif tensor_dtype == np.int16:
|
|
dtype = GGMLQuantizationType.I16
|
|
elif tensor_dtype == np.int32:
|
|
dtype = GGMLQuantizationType.I32
|
|
elif tensor_dtype == np.int64:
|
|
dtype = GGMLQuantizationType.I64
|
|
else:
|
|
raise ValueError("Only F16, F32, F64, I8, I16, I32, I64 tensors are supported for now")
|
|
else:
|
|
dtype = raw_dtype
|
|
if tensor_dtype == np.uint8:
|
|
tensor_shape = quant_shape_from_byte_shape(tensor_shape, raw_dtype)
|
|
|
|
# make sure there is at least one tensor before splitting
|
|
if len(self.tensors[-1]) > 0:
|
|
if ( # split when over tensor limit
|
|
self.split_max_tensors != 0
|
|
and len(self.tensors[-1]) >= self.split_max_tensors
|
|
) or ( # split when over size limit
|
|
self.split_max_size != 0
|
|
and sum(ti.nbytes for ti in self.tensors[-1].values()) + tensor_nbytes > self.split_max_size
|
|
):
|
|
self.tensors.append({})
|
|
|
|
self.tensors[-1][name] = TensorInfo(shape=tensor_shape, dtype=dtype, nbytes=tensor_nbytes)
|
|
|
|
def add_tensor(
|
|
self, name: str, tensor: np.ndarray[Any, Any], raw_shape: Sequence[int] | None = None,
|
|
raw_dtype: GGMLQuantizationType | None = None,
|
|
) -> None:
|
|
if self.endianess == GGUFEndian.BIG:
|
|
tensor.byteswap(inplace=True)
|
|
if self.use_temp_file and self.temp_file is None:
|
|
fp = tempfile.SpooledTemporaryFile(mode="w+b", max_size=256 * 1024 * 1024)
|
|
fp.seek(0)
|
|
self.temp_file = fp
|
|
|
|
shape: Sequence[int] = raw_shape if raw_shape is not None else tensor.shape
|
|
self.add_tensor_info(name, shape, tensor.dtype, tensor.nbytes, raw_dtype=raw_dtype)
|
|
|
|
if self.temp_file is None:
|
|
self.tensors[-1][name].tensor = tensor
|
|
return
|
|
|
|
tensor.tofile(self.temp_file)
|
|
self.write_padding(self.temp_file, tensor.nbytes)
|
|
|
|
def write_padding(self, fp: IO[bytes], n: int, align: int | None = None) -> None:
|
|
pad = GGUFWriter.ggml_pad(n, align if align is not None else self.data_alignment) - n
|
|
if pad != 0:
|
|
fp.write(bytes([0] * pad))
|
|
|
|
def write_tensor_data(self, tensor: np.ndarray[Any, Any]) -> None:
|
|
if self.state is not WriterState.TI_DATA and self.state is not WriterState.WEIGHTS:
|
|
raise ValueError(f'Expected output file to contain tensor info or weights, got {self.state}')
|
|
assert self.fout is not None
|
|
|
|
if self.endianess == GGUFEndian.BIG:
|
|
tensor.byteswap(inplace=True)
|
|
|
|
file_id = -1
|
|
for i, tensors in enumerate(self.tensors):
|
|
if len(tensors) > 0:
|
|
file_id = i
|
|
break
|
|
|
|
fout = self.fout[file_id]
|
|
|
|
# pop the first tensor info
|
|
# TODO: cleaner way to get the first key
|
|
first_tensor_name = [name for name, _ in zip(self.tensors[file_id].keys(), range(1))][0]
|
|
ti = self.tensors[file_id].pop(first_tensor_name)
|
|
assert ti.nbytes == tensor.nbytes
|
|
|
|
self.write_padding(fout, fout.tell())
|
|
tensor.tofile(fout)
|
|
self.write_padding(fout, tensor.nbytes)
|
|
|
|
self.state = WriterState.WEIGHTS
|
|
|
|
def write_tensors_to_file(self, *, progress: bool = False) -> None:
|
|
self.write_ti_data_to_file()
|
|
|
|
assert self.fout is not None
|
|
|
|
for fout in self.fout:
|
|
self.write_padding(fout, fout.tell())
|
|
|
|
if self.temp_file is None:
|
|
shard_bar = None
|
|
bar = None
|
|
|
|
if progress:
|
|
from tqdm import tqdm
|
|
|
|
total_bytes = sum(ti.nbytes for t in self.tensors for ti in t.values())
|
|
|
|
if len(self.fout) > 1:
|
|
shard_bar = tqdm(desc=f"Shard (0/{len(self.fout)})", total=None, unit="byte", unit_scale=True)
|
|
bar = tqdm(desc="Writing", total=total_bytes, unit="byte", unit_scale=True)
|
|
|
|
for i, (fout, tensors) in enumerate(zip(self.fout, self.tensors)):
|
|
if shard_bar is not None:
|
|
shard_bar.set_description(f"Shard ({i + 1}/{len(self.fout)})")
|
|
total = sum(ti.nbytes for ti in tensors.values())
|
|
shard_bar.reset(total=(total if total > 0 else None))
|
|
|
|
# relying on the fact that Python dicts preserve insertion order (since 3.7)
|
|
for ti in tensors.values():
|
|
assert ti.tensor is not None # can only iterate once over the tensors
|
|
assert ti.tensor.nbytes == ti.nbytes
|
|
ti.tensor.tofile(fout)
|
|
if shard_bar is not None:
|
|
shard_bar.update(ti.nbytes)
|
|
if bar is not None:
|
|
bar.update(ti.nbytes)
|
|
self.write_padding(fout, ti.nbytes)
|
|
ti.tensor = None
|
|
else:
|
|
self.temp_file.seek(0)
|
|
|
|
shutil.copyfileobj(self.temp_file, self.fout[0 if not self.small_first_shard else 1])
|
|
self.flush()
|
|
self.temp_file.close()
|
|
|
|
self.state = WriterState.WEIGHTS
|
|
|
|
def flush(self) -> None:
|
|
assert self.fout is not None
|
|
for fout in self.fout:
|
|
fout.flush()
|
|
|
|
def close(self) -> None:
|
|
if self.fout is not None:
|
|
for fout in self.fout:
|
|
fout.close()
|
|
self.fout = None
|
|
|
|
def add_architecture(self) -> None:
|
|
self.add_string(Keys.General.ARCHITECTURE, self.arch)
|
|
|
|
def add_author(self, author: str) -> None:
|
|
self.add_string(Keys.General.AUTHOR, author)
|
|
|
|
def add_version(self, version: str) -> None:
|
|
self.add_string(Keys.General.VERSION, version)
|
|
|
|
def add_tensor_data_layout(self, layout: str) -> None:
|
|
self.add_string(Keys.LLM.TENSOR_DATA_LAYOUT.format(arch=self.arch), layout)
|
|
|
|
def add_url(self, url: str) -> None:
|
|
self.add_string(Keys.General.URL, url)
|
|
|
|
def add_description(self, description: str) -> None:
|
|
self.add_string(Keys.General.DESCRIPTION, description)
|
|
|
|
def add_licence(self, licence: str) -> None:
|
|
self.add_string(Keys.General.LICENSE, licence)
|
|
|
|
def add_source_url(self, url: str) -> None:
|
|
self.add_string(Keys.General.SOURCE_URL, url)
|
|
|
|
def add_source_hf_repo(self, repo: str) -> None:
|
|
self.add_string(Keys.General.SOURCE_HF_REPO, repo)
|
|
|
|
def add_file_type(self, ftype: int) -> None:
|
|
self.add_uint32(Keys.General.FILE_TYPE, ftype)
|
|
|
|
def add_name(self, name: str) -> None:
|
|
self.add_string(Keys.General.NAME, name)
|
|
|
|
def add_quantization_version(self, quantization_version: int) -> None:
|
|
self.add_uint32(
|
|
Keys.General.QUANTIZATION_VERSION, quantization_version)
|
|
|
|
def add_custom_alignment(self, alignment: int) -> None:
|
|
self.data_alignment = alignment
|
|
self.add_uint32(Keys.General.ALIGNMENT, alignment)
|
|
|
|
def add_vocab_size(self, size: int) -> None:
|
|
self.add_uint32(Keys.LLM.VOCAB_SIZE.format(arch=self.arch), size)
|
|
|
|
def add_context_length(self, length: int) -> None:
|
|
self.add_uint32(Keys.LLM.CONTEXT_LENGTH.format(arch=self.arch), length)
|
|
|
|
def add_embedding_length(self, length: int) -> None:
|
|
self.add_uint32(Keys.LLM.EMBEDDING_LENGTH.format(arch=self.arch), length)
|
|
|
|
def add_block_count(self, length: int) -> None:
|
|
self.add_uint32(Keys.LLM.BLOCK_COUNT.format(arch=self.arch), length)
|
|
|
|
def add_leading_dense_block_count(self, length: int) -> None:
|
|
self.add_uint32(Keys.LLM.LEADING_DENSE_BLOCK_COUNT.format(arch=self.arch), length)
|
|
|
|
def add_feed_forward_length(self, length: int) -> None:
|
|
self.add_uint32(Keys.LLM.FEED_FORWARD_LENGTH.format(arch=self.arch), length)
|
|
|
|
def add_expert_feed_forward_length(self, length: int) -> None:
|
|
self.add_uint32(Keys.LLM.EXPERT_FEED_FORWARD_LENGTH.format(arch=self.arch), length)
|
|
|
|
def add_expert_shared_feed_forward_length(self, length: int) -> None:
|
|
self.add_uint32(Keys.LLM.EXPERT_SHARED_FEED_FORWARD_LENGTH.format(arch=self.arch), length)
|
|
|
|
def add_parallel_residual(self, use: bool) -> None:
|
|
self.add_bool(Keys.LLM.USE_PARALLEL_RESIDUAL.format(arch=self.arch), use)
|
|
|
|
def add_decoder_start_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.LLM.DECODER_START_TOKEN_ID.format(arch=self.arch), id)
|
|
|
|
def add_head_count(self, count: int) -> None:
|
|
self.add_uint32(Keys.Attention.HEAD_COUNT.format(arch=self.arch), count)
|
|
|
|
def add_head_count_kv(self, count: int) -> None:
|
|
self.add_uint32(Keys.Attention.HEAD_COUNT_KV.format(arch=self.arch), count)
|
|
|
|
def add_key_length(self, length: int) -> None:
|
|
self.add_uint32(Keys.Attention.KEY_LENGTH.format(arch=self.arch), length)
|
|
|
|
def add_value_length(self, length: int) -> None:
|
|
self.add_uint32(Keys.Attention.VALUE_LENGTH.format(arch=self.arch), length)
|
|
|
|
def add_max_alibi_bias(self, bias: float) -> None:
|
|
self.add_float32(Keys.Attention.MAX_ALIBI_BIAS.format(arch=self.arch), bias)
|
|
|
|
def add_clamp_kqv(self, value: float) -> None:
|
|
self.add_float32(Keys.Attention.CLAMP_KQV.format(arch=self.arch), value)
|
|
|
|
def add_logit_scale(self, value: float) -> None:
|
|
self.add_float32(Keys.LLM.LOGIT_SCALE.format(arch=self.arch), value)
|
|
|
|
def add_expert_count(self, count: int) -> None:
|
|
self.add_uint32(Keys.LLM.EXPERT_COUNT.format(arch=self.arch), count)
|
|
|
|
def add_expert_used_count(self, count: int) -> None:
|
|
self.add_uint32(Keys.LLM.EXPERT_USED_COUNT.format(arch=self.arch), count)
|
|
|
|
def add_expert_shared_count(self, count: int) -> None:
|
|
self.add_uint32(Keys.LLM.EXPERT_SHARED_COUNT.format(arch=self.arch), count)
|
|
|
|
def add_expert_weights_scale(self, value: float) -> None:
|
|
self.add_float32(Keys.LLM.EXPERT_WEIGHTS_SCALE.format(arch=self.arch), value)
|
|
|
|
def add_layer_norm_eps(self, value: float) -> None:
|
|
self.add_float32(Keys.Attention.LAYERNORM_EPS.format(arch=self.arch), value)
|
|
|
|
def add_layer_norm_rms_eps(self, value: float) -> None:
|
|
self.add_float32(Keys.Attention.LAYERNORM_RMS_EPS.format(arch=self.arch), value)
|
|
|
|
def add_causal_attention(self, value: bool) -> None:
|
|
self.add_bool(Keys.Attention.CAUSAL.format(arch=self.arch), value)
|
|
|
|
def add_q_lora_rank(self, length: int) -> None:
|
|
self.add_uint32(Keys.Attention.Q_LORA_RANK.format(arch=self.arch), length)
|
|
|
|
def add_kv_lora_rank(self, length: int) -> None:
|
|
self.add_uint32(Keys.Attention.KV_LORA_RANK.format(arch=self.arch), length)
|
|
|
|
def add_relative_attn_buckets_count(self, value: int) -> None:
|
|
self.add_uint32(Keys.Attention.REL_BUCKETS_COUNT.format(arch=self.arch), value)
|
|
|
|
def add_pooling_type(self, value: PoolingType) -> None:
|
|
self.add_uint32(Keys.LLM.POOLING_TYPE.format(arch=self.arch), value.value)
|
|
|
|
def add_rope_dimension_count(self, count: int) -> None:
|
|
self.add_uint32(Keys.Rope.DIMENSION_COUNT.format(arch=self.arch), count)
|
|
|
|
def add_rope_freq_base(self, value: float) -> None:
|
|
self.add_float32(Keys.Rope.FREQ_BASE.format(arch=self.arch), value)
|
|
|
|
def add_rope_scaling_type(self, value: RopeScalingType) -> None:
|
|
self.add_string(Keys.Rope.SCALING_TYPE.format(arch=self.arch), value.value)
|
|
|
|
def add_rope_scaling_factor(self, value: float) -> None:
|
|
self.add_float32(Keys.Rope.SCALING_FACTOR.format(arch=self.arch), value)
|
|
|
|
def add_rope_scaling_attn_factors(self, value: float) -> None:
|
|
self.add_float32(Keys.Rope.SCALING_ATTN_FACTOR.format(arch=self.arch), value)
|
|
|
|
def add_rope_scaling_orig_ctx_len(self, value: int) -> None:
|
|
self.add_uint32(Keys.Rope.SCALING_ORIG_CTX_LEN.format(arch=self.arch), value)
|
|
|
|
def add_rope_scaling_finetuned(self, value: bool) -> None:
|
|
self.add_bool(Keys.Rope.SCALING_FINETUNED.format(arch=self.arch), value)
|
|
|
|
def add_rope_scaling_yarn_log_mul(self, value: float) -> None:
|
|
self.add_float32(Keys.Rope.SCALING_YARN_LOG_MUL.format(arch=self.arch), value)
|
|
|
|
def add_ssm_conv_kernel(self, value: int) -> None:
|
|
self.add_uint32(Keys.SSM.CONV_KERNEL.format(arch=self.arch), value)
|
|
|
|
def add_ssm_inner_size(self, value: int) -> None:
|
|
self.add_uint32(Keys.SSM.INNER_SIZE.format(arch=self.arch), value)
|
|
|
|
def add_ssm_state_size(self, value: int) -> None:
|
|
self.add_uint32(Keys.SSM.STATE_SIZE.format(arch=self.arch), value)
|
|
|
|
def add_ssm_time_step_rank(self, value: int) -> None:
|
|
self.add_uint32(Keys.SSM.TIME_STEP_RANK.format(arch=self.arch), value)
|
|
|
|
def add_tokenizer_model(self, model: str) -> None:
|
|
self.add_string(Keys.Tokenizer.MODEL, model)
|
|
|
|
def add_tokenizer_pre(self, pre: str) -> None:
|
|
self.add_string(Keys.Tokenizer.PRE, pre)
|
|
|
|
def add_token_list(self, tokens: Sequence[str] | Sequence[bytes] | Sequence[bytearray]) -> None:
|
|
self.add_array(Keys.Tokenizer.LIST, tokens)
|
|
|
|
def add_token_merges(self, merges: Sequence[str] | Sequence[bytes] | Sequence[bytearray]) -> None:
|
|
self.add_array(Keys.Tokenizer.MERGES, merges)
|
|
|
|
def add_token_types(self, types: Sequence[TokenType] | Sequence[int]) -> None:
|
|
self.add_array(Keys.Tokenizer.TOKEN_TYPE, types)
|
|
|
|
def add_token_type_count(self, value: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.TOKEN_TYPE_COUNT, value)
|
|
|
|
def add_token_scores(self, scores: Sequence[float]) -> None:
|
|
self.add_array(Keys.Tokenizer.SCORES, scores)
|
|
|
|
def add_bos_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.BOS_ID, id)
|
|
|
|
def add_eos_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.EOS_ID, id)
|
|
|
|
def add_unk_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.UNK_ID, id)
|
|
|
|
def add_sep_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.SEP_ID, id)
|
|
|
|
def add_pad_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.PAD_ID, id)
|
|
|
|
def add_cls_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.CLS_ID, id)
|
|
|
|
def add_mask_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.MASK_ID, id)
|
|
|
|
def add_add_bos_token(self, value: bool) -> None:
|
|
self.add_bool(Keys.Tokenizer.ADD_BOS, value)
|
|
|
|
def add_add_eos_token(self, value: bool) -> None:
|
|
self.add_bool(Keys.Tokenizer.ADD_EOS, value)
|
|
|
|
def add_add_space_prefix(self, value: bool) -> None:
|
|
self.add_bool(Keys.Tokenizer.ADD_PREFIX, value)
|
|
|
|
def add_remove_extra_whitespaces(self, value: bool) -> None:
|
|
self.add_bool(Keys.Tokenizer.REMOVE_EXTRA_WS, value)
|
|
|
|
def add_precompiled_charsmap(self, charsmap: Sequence[bytes]) -> None:
|
|
self.add_array(Keys.Tokenizer.PRECOMPILED_CHARSMAP, charsmap)
|
|
|
|
def add_chat_template(self, value: str | Sequence[Mapping[str, str]]) -> None:
|
|
if not isinstance(value, str):
|
|
template_default = None
|
|
template_names = set()
|
|
|
|
for choice in value:
|
|
name = choice.get('name', '')
|
|
template = choice.get('template')
|
|
|
|
# Allowing non-alphanumerical characters in template name is probably not a good idea, so filter it
|
|
name = ''.join((c if c in ascii_letters + digits else '_' for c in name))
|
|
|
|
if name and template is not None:
|
|
if name == 'default':
|
|
template_default = template
|
|
else:
|
|
template_names.add(name)
|
|
self.add_string(Keys.Tokenizer.CHAT_TEMPLATE_N.format(name=name), template)
|
|
|
|
if template_names:
|
|
self.add_array(Keys.Tokenizer.CHAT_TEMPLATES, list(template_names))
|
|
|
|
if template_default is None:
|
|
return
|
|
|
|
value = template_default
|
|
|
|
self.add_string(Keys.Tokenizer.CHAT_TEMPLATE, value)
|
|
|
|
def add_prefix_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.PREFIX_ID, id)
|
|
|
|
def add_suffix_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.SUFFIX_ID, id)
|
|
|
|
def add_middle_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.MIDDLE_ID, id)
|
|
|
|
def add_eot_token_id(self, id: int) -> None:
|
|
self.add_uint32(Keys.Tokenizer.EOT_ID, id)
|
|
|
|
def _pack(self, fmt: str, value: Any, skip_pack_prefix: bool = False) -> bytes:
|
|
pack_prefix = ''
|
|
if not skip_pack_prefix:
|
|
pack_prefix = '<' if self.endianess == GGUFEndian.LITTLE else '>'
|
|
return struct.pack(f'{pack_prefix}{fmt}', value)
|
|
|
|
def _pack_val(self, val: Any, vtype: GGUFValueType, add_vtype: bool) -> bytes:
|
|
kv_data = bytearray()
|
|
|
|
if add_vtype:
|
|
kv_data += self._pack("I", vtype)
|
|
|
|
pack_fmt = self._simple_value_packing.get(vtype)
|
|
if pack_fmt is not None:
|
|
kv_data += self._pack(pack_fmt, val, skip_pack_prefix = vtype == GGUFValueType.BOOL)
|
|
elif vtype == GGUFValueType.STRING:
|
|
encoded_val = val.encode("utf-8") if isinstance(val, str) else val
|
|
kv_data += self._pack("Q", len(encoded_val))
|
|
kv_data += encoded_val
|
|
elif vtype == GGUFValueType.ARRAY and isinstance(val, Sequence) and val:
|
|
if isinstance(val, bytes):
|
|
ltype = GGUFValueType.UINT8
|
|
else:
|
|
ltype = GGUFValueType.get_type(val[0])
|
|
if not all(GGUFValueType.get_type(i) is ltype for i in val[1:]):
|
|
raise ValueError("All items in a GGUF array should be of the same type")
|
|
kv_data += self._pack("I", ltype)
|
|
kv_data += self._pack("Q", len(val))
|
|
for item in val:
|
|
kv_data += self._pack_val(item, ltype, add_vtype=False)
|
|
else:
|
|
raise ValueError("Invalid GGUF metadata value type or value")
|
|
|
|
return kv_data
|
|
|
|
@staticmethod
|
|
def format_n_bytes_to_str(num: int) -> str:
|
|
if num == 0:
|
|
return "negligible - metadata only"
|
|
fnum = float(num)
|
|
for unit in ("", "K", "M", "G"):
|
|
if abs(fnum) < 1000.0:
|
|
return f"{fnum:3.1f}{unit}"
|
|
fnum /= 1000.0
|
|
return f"{fnum:.1f}T - over 1TB, split recommended"
|