Skip to content

llmcompressor.pipelines

Compression pipelines for orchestrating different compression strategies.

Provides various compression pipelines including basic, sequential, independent, layer-sequential, and data-free approaches. Each pipeline coordinates different compression techniques and workflows for optimal model optimization based on specific requirements and constraints.

Modules:

Classes:

Functions:

BasicPipeline

CalibrationPipeline

Bases: ABC, RegistryMixin

Methods:

  • from_modifiers

    Infer which calibration pipeline to use based on the available modifiers and

from_modifiers classmethod

from_modifiers(
    modifiers: list[Modifier], user: str | None = None
) -> CalibrationPipeline

Infer which calibration pipeline to use based on the available modifiers and any user specifications

Parameters:

  • modifiers

    (list[Modifier]) –

    modifiers to apply to model

  • user

    (str | None, default: None ) –

    pipeline name passed by user

Returns:

  • CalibrationPipeline

    CalibrationPipeline instance to be called with data (if not datafree)

Source code in llmcompressor/pipelines/registry.py
@classmethod
def from_modifiers(
    cls, modifiers: list[Modifier], user: str | None = None
) -> "CalibrationPipeline":
    """
    Infer which calibration pipeline to use based on the available modifiers and
    any user specifications

    :param modifiers: modifiers to apply to model
    :param user: pipeline name passed by user
    :return: CalibrationPipeline instance to be called with data (if not datafree)
    """
    user = standardize_lookup_name(user) if user else None
    inferred = standardize_lookup_name(cls._infer_pipeline(modifiers))
    independent = standardize_lookup_name("independent")

    if user == independent:
        inferred = independent

    if user is not None and user != inferred:
        logger.warning(
            f"Calibration pipeline is set to `{user}`, but it is recommended to "
            f"use `{inferred}`"
        )

    pipeline = user or inferred
    return cls.load_from_registry(pipeline)

DataFreePipeline

IndependentPipeline

SequentialPipeline

Subgraph dataclass

Subgraph(
    graph: Graph,
    input_names: set[str],
    consumed_names: set[str],
    _code: PythonCode | None = None,
)

Dataclass specifying an executable subgraph of a model graph

Parameters:

  • graph

    (Graph) –

    subgraph of model graph

  • input_names

    (set[str]) –

    argument names of the compiled forward function

  • consumed_names

    (set[str]) –

    argument names which are not used by any subsequent subgraphs and can therefore be deleted from the intermediates cache

Methods:

  • forward

    Execute the operations within the subgraph

forward

forward(*args, **kwargs) -> dict[str, Any]

Execute the operations within the subgraph

Parameters:

  • \*args

    argument inputs to subgraph forward function

  • \**kwargs

    keyword inputs to subgraph forward function

Returns:

  • dict[str, Any]
Source code in llmcompressor/pipelines/sequential/helpers.py
def forward(self, *args, **kwargs) -> dict[str, Any]:
    """
    Execute the operations within the subgraph

    :param \\*args: argument inputs to subgraph forward function
    :param \\**kwargs: keyword inputs to subgraph forward function
    :return keyword outputs of subgraph forward function (non-consumed variables):
    """
    if self._code is None:
        self._code = self.graph.python_code("self")
        exec(self._code.src, self._code.globals)

    forward_fn = self._code.globals.get("forward")

    with append_autowrap_source_on_fail():
        return forward_fn(*args, **kwargs)

dispatch_for_sequential

dispatch_for_sequential(
    model: PreTrainedModel,
    onload_device: Optional[device | str] = None,
    offload_device: Optional[device | str] = None,
) -> PreTrainedModel

Dispatch a model for sequential calibration using a sequential pipeline. The model will be offloaded to the CPU and dispatched to CUDA/XPU device if available. Removes any existing hooks.

Parameters:

  • model

    (PreTrainedModel) –

    model to dispatch

Returns:

  • PreTrainedModel

    dispatched model

Source code in llmcompressor/pipelines/sequential/helpers.py
def dispatch_for_sequential(
    model: PreTrainedModel,
    onload_device: Optional[torch.device | str] = None,
    offload_device: Optional[torch.device | str] = None,
) -> PreTrainedModel:
    """
    Dispatch a model for sequential calibration using a sequential pipeline.
    The model will be offloaded to the CPU and dispatched to CUDA/XPU device
    if available. Removes any existing hooks.

    :param model: model to dispatch
    :return: dispatched model
    """
    if onload_device is None:
        onload_device = get_main_device()
    return offload_model(model, onload_device, offload_device)

get_sequential_targets

get_sequential_targets(
    modifiers: list[Modifier],
    model: PreTrainedModel,
    args: DatasetArguments,
) -> list[str]

Infer sequential targets from modifiers list and dataset args

Parameters:

  • model

    (PreTrainedModel) –

    model being calibrated

  • modifiers

    (list[Modifier]) –

    list of modifiers being applied during calibration

  • dataset_args

    dataset arguments passed by user

Returns:

  • list[str]

    list of sequential targets

Source code in llmcompressor/pipelines/sequential/helpers.py
def get_sequential_targets(
    modifiers: list[Modifier], model: PreTrainedModel, args: "DatasetArguments"
) -> list[str]:
    """
    Infer sequential targets from modifiers list and dataset args

    :param model: model being calibrated
    :param modifiers: list of modifiers being applied during calibration
    :param dataset_args: dataset arguments passed by user
    :return: list of sequential targets
    """
    modifier_targets = [
        (modifier, modifier.sequential_targets)
        for modifier in modifiers
        if getattr(modifier, "sequential_targets", None) is not None
    ]

    # deprecation warning
    if len(modifier_targets) >= 1:
        logger.warning(
            "Passing sequential targets through modifiers is deprecated, "
            "please use `oneshot(sequential_targets=...)`"
        )

    # cannot infer from multiple modifiers
    if len(modifier_targets) >= 2:
        types = [type(modifier) for modifier, _ in modifier_targets]
        raise ValueError(
            "Cannot infer sequential targets from multiple sequential modifiers "
            f"({types})"
        )

    # resolve single modifier
    if len(modifier_targets) == 1:
        if args.sequential_targets is not None:
            raise ValueError(
                f"Got sequential targets from both {type(modifier_targets[0][0])} "
                "and dataset arguments `sequential_targets`"
            )

        sequential_targets = modifier_targets[0][1]

    # if no modifiers, use data args
    else:
        sequential_targets = args.sequential_targets  # may be `None`

    # validate and infer
    match sequential_targets:
        case None:
            return get_no_split_params(model)
        case str():
            return [sequential_targets]
        case _:
            return sequential_targets

handle_sequential_oom

handle_sequential_oom(func)

Catch ooms and suggest changing sequential targets

Source code in llmcompressor/pipelines/sequential/helpers.py
def handle_sequential_oom(func):
    """Catch ooms and suggest changing sequential targets"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except torch.cuda.OutOfMemoryError as e:
            raise torch.cuda.OutOfMemoryError(
                "Sequential pipeline ran out of memory. "
                "Please consider choosing a smaller module "
                "for `sequential_targets` argument, ex. 'Linear'"
            ) from e

    return wrapper

trace_subgraphs

trace_subgraphs(
    model: PreTrainedModel,
    sample_input: dict[str, Any],
    sequential_targets: list[str],
    ignore: list[str],
) -> list[Subgraph]

Trace a model to produce subgraphs, where each sequential target belongs to exactly one subgraph and where executing each subgraph in order is equivalent to executing the original model

Parameters:

  • model

    (PreTrainedModel) –

    model being traced

  • sample_input

    (dict[str, Any]) –

    inputs whose values will change during execution but whose len, bool, and contains values are assumed constant across batches

  • sequential_targets

    (list[str]) –

    list of patterns matching sequential targets

  • ignore

    (list[str]) –

    function and method names to skip during tracing

Returns:

  • list[Subgraph]

    a list of Subgraphs in order of execution

Source code in llmcompressor/pipelines/sequential/helpers.py
def trace_subgraphs(
    model: PreTrainedModel,
    sample_input: dict[str, Any],
    sequential_targets: list[str],
    ignore: list[str],
) -> list[Subgraph]:
    """
    Trace a model to produce subgraphs, where each sequential target belongs to exactly
    one subgraph and where executing each subgraph in order is equivalent to executing
    the original model

    :param model: model being traced
    :param sample_input: inputs whose values will change during execution but whose
        __len__, __bool__, and __contains__ values are assumed constant across batches
    :param sequential_targets: list of patterns matching sequential targets
    :param ignore: function and method names to skip during tracing
    :return: a list of Subgraphs in order of execution
    """
    # find modules
    targets = match_modules(model, sequential_targets)
    ancestors = get_sequential_ancestors(model, targets)
    offloaded = set()  # TODO: cleanup logic

    # initialize arguments
    tracer = SequentialTracer(ancestors, offloaded)
    concrete_args = populate_concrete_args(model, sample_input)

    with contextlib.ExitStack() as stack:
        # calibration context
        stack.enter_context(calibration_forward_context(model))
        stack.enter_context(HooksMixin.disable_hooks())

        # flags useful for tracing
        stack.enter_context(patch_attr(model.config, "_attn_implementation", "eager"))
        stack.enter_context(patch_attr(torch.compiler, "_is_compiling_flag", True))

        # autowrap forwards
        stack.enter_context(autowrap_forwards(ancestors, ignore))

        # avoid bug where pytorch cannot handle wrapped root functions
        unwrapped = inspect.unwrap(model.forward).__get__(model)
        stack.enter_context(patch_attr(model, "forward", unwrapped))
        stack.enter_context(patch_attr(type(model), "forward", unwrapped.__func__))
        assert isinstance(model.forward, MethodType)
        assert isinstance(type(model).forward, FunctionType)

        # avoid device movement during tracing
        stack.enter_context(disable_onloading())

        with append_autowrap_source_on_fail():
            graph = GraphModule(
                model,
                tracer.trace(
                    model,
                    dummy_inputs=sample_input,
                    concrete_args=concrete_args,
                    complete_concrete_args_with_inputs_not_in_dummy_inputs=False,
                    # bug in trace throws an error for variadic
                    # args and kwargs in function signature
                ),
            )

    # copy metadata
    graph.config = model.config
    graph.class_for_deserialization = model.__class__
    graph.device = model.device

    # perform subgraph partition
    partitions = topological_partition(graph, targets)
    subgraphs = partition_graph(model, partitions)
    trace_consumed_names(subgraphs)

    # As currently implemented, `topological_partition` generates an extra subgraph at
    # the beginning which does not contain a target. This adds a little more runtime,
    # and could be folded into the first subgraph in the future
    if len(subgraphs) != len(targets) + 1:
        logger.warning(
            f"Expected {len(targets)} subgraphs, but only traced {len(subgraphs)}. "
            "This is likely due to having wrapped code which calls sequential targets"
        )

    return subgraphs