Skip to content

pipeline

ls_mlkit.pipeline

BaseCallback

Source code in src/ls_mlkit/pipeline/callback.py
33
34
35
36
37
38
39
40
41
42
class BaseCallback(metaclass=ABCMeta):
    @abstractmethod
    def on_event(self, event: CallbackEvent, *args, **kwargs):
        """On event

        Args:
            event (CallbackEvent): the event to trigger
            *args: the arguments to pass to the callback
            **kwargs: the keyword arguments to pass to the callback
        """

on_event(event, *args, **kwargs) abstractmethod

On event

Parameters:

Name Type Description Default
event CallbackEvent

the event to trigger

required
*args

the arguments to pass to the callback

()
**kwargs

the keyword arguments to pass to the callback

{}
Source code in src/ls_mlkit/pipeline/callback.py
34
35
36
37
38
39
40
41
42
@abstractmethod
def on_event(self, event: CallbackEvent, *args, **kwargs):
    """On event

    Args:
        event (CallbackEvent): the event to trigger
        *args: the arguments to pass to the callback
        **kwargs: the keyword arguments to pass to the callback
    """

CallbackManager

Source code in src/ls_mlkit/pipeline/callback.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class CallbackManager:
    def __init__(self):
        self.callbacks: List[BaseCallback] = []

    def add_callback(self, callback: BaseCallback):
        """Add a callback

        Args:
            callback (BaseCallback): the callback to add
        """
        if callback is not None:
            self.callbacks.append(callback)

    def add_callbacks(self, callbacks: Optional[List[BaseCallback]]):
        """Add a list of callbacks

        Args:
            callbacks (Optional[List[BaseCallback]]): the callbacks to add
        """
        if callbacks is not None and len(callbacks) > 0:
            self.callbacks.extend(callbacks)

    def trigger(self, event: CallbackEvent, *args, **kwargs):
        """Trigger all callbacks for a given event

        Args:
            event (CallbackEvent): the event to trigger
            *args: the arguments to pass to the callback
            **kwargs: the keyword arguments to pass to the callback
        """
        for callback in self.callbacks:
            callback.on_event(event, *args, **kwargs)

add_callback(callback)

Add a callback

Parameters:

Name Type Description Default
callback BaseCallback

the callback to add

required
Source code in src/ls_mlkit/pipeline/callback.py
49
50
51
52
53
54
55
56
def add_callback(self, callback: BaseCallback):
    """Add a callback

    Args:
        callback (BaseCallback): the callback to add
    """
    if callback is not None:
        self.callbacks.append(callback)

add_callbacks(callbacks)

Add a list of callbacks

Parameters:

Name Type Description Default
callbacks Optional[List[BaseCallback]]

the callbacks to add

required
Source code in src/ls_mlkit/pipeline/callback.py
58
59
60
61
62
63
64
65
def add_callbacks(self, callbacks: Optional[List[BaseCallback]]):
    """Add a list of callbacks

    Args:
        callbacks (Optional[List[BaseCallback]]): the callbacks to add
    """
    if callbacks is not None and len(callbacks) > 0:
        self.callbacks.extend(callbacks)

trigger(event, *args, **kwargs)

Trigger all callbacks for a given event

Parameters:

Name Type Description Default
event CallbackEvent

the event to trigger

required
*args

the arguments to pass to the callback

()
**kwargs

the keyword arguments to pass to the callback

{}
Source code in src/ls_mlkit/pipeline/callback.py
67
68
69
70
71
72
73
74
75
76
def trigger(self, event: CallbackEvent, *args, **kwargs):
    """Trigger all callbacks for a given event

    Args:
        event (CallbackEvent): the event to trigger
        *args: the arguments to pass to the callback
        **kwargs: the keyword arguments to pass to the callback
    """
    for callback in self.callbacks:
        callback.on_event(event, *args, **kwargs)

DistributedPipeline

Bases: BasePipeline

Source code in src/ls_mlkit/pipeline/distributed_pipeline.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
@inherit_docstrings
class DistributedPipeline(BasePipeline):
    def __init__(
        self,
        model: torch.nn.Module,
        dataset: Union[torch.utils.data.Dataset, datasets.Dataset],
        optimizers: Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LRScheduler],
        training_config: DistributedTrainingConfig,
        log_config: LogConfig,
        logger: logging.Logger | None,
        collate_fn: Optional[Callable] = None,
        seed: int = 42,
        callbacks: Optional[List[BaseCallback]] = None,
        **kwargs,
    ):
        """Initialize the DistributedPipeline

        Args:
            model (torch.nn.Module): the model to train
            dataset (Union[torch.utils.data.Dataset, datasets.Dataset]): the dataset to train on
            optimizers (Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR]): the optimizers to use for training
            training_config (DistributedTrainingConfig): the training configuration
            log_config (LogConfig): the logging configuration
            logger (logging.Logger): the logger to use for logging
            collate_fn (Optional[Callable], optional): the collate function to use for the dataset. Defaults to None.
            seed (int, optional): the seed to use for the random number generator. Defaults to 42.
        """
        accelerate.utils.set_seed(seed)
        self.accelerator = Accelerator(
            gradient_accumulation_steps=training_config.gradient_accumulation_steps,
            mixed_precision=training_config.mixed_precision,
            kwargs_handlers=[
                DistributedDataParallelKwargs(find_unused_parameters=training_config.find_unused_parameters),
            ],
        )

        super().__init__(
            model=model,
            dataset=dataset,
            optimizers=optimizers,
            training_config=training_config,
            log_config=log_config,
            collate_fn=collate_fn,
            logger=logger,
            callbacks=callbacks,
            load_checkpoint=False,
            **kwargs,
        )

        # Prepare everything for distributed training BEFORE loading
        self.model, self.optimizer, self.dataloader, self.scheduler = self.accelerator.prepare(
            self.model, self.optimizer, self.dataloader, self.scheduler
        )

        # Load checkpoint AFTER prepare to ensure proper state restoration
        if self.training_config.save_dir is not None and self.training_config.save_dir != "":
            self.load()
            self.training_config = training_config

        if self.accelerator.is_local_main_process:
            assert self.logger is not None, f"Error from {self.__class__.__name__}: logger is required"
            self.logger.info("Using distributed training with accelerate")
            self.logger.info(f"Number of processes: {self.accelerator.num_processes}")
            self.logger.info(f"Current device: {self.accelerator.device}")

    def gradient_clip(self) -> None:
        model = self.model
        if self.training_config.grad_clip_strategy == "norm":
            self.accelerator.clip_grad_norm_(
                model.parameters(),
                max_norm=self.training_config.max_grad_norm,
                norm_type=2,
            )
        if self.training_config.grad_clip_strategy == "value":
            self.accelerator.clip_grad_value_(
                model.parameters(),
                clip_value=self.training_config.max_grad_value,
            )

    def train_a_step(self, batch: Dict[str, Any]):
        self.trigger_callbacks(event=CallbackEvent.STEP_START, batch=batch)
        model: torch.nn.Module = self.model
        optimizer = self.optimizer
        scheduler = self.scheduler
        logger = self.logger
        model.train()

        result = {}

        should_sync = (
            self.training_state.current_global_step + 1
        ) % self.training_config.gradient_accumulation_steps == 0
        ctx = self.accelerator.no_sync(model=model) if not should_sync else nullcontext()
        with ctx:
            raw_loss = self.compute_loss(model, batch)
            if isinstance(raw_loss, torch.Tensor):
                loss = raw_loss
            else:
                loss = cast(torch.Tensor, cast(Dict[str, Any], raw_loss)["loss"])
            self.trigger_callbacks(event=CallbackEvent.PRE_BACKWARD)
            self.accelerator.backward(loss)
            self.trigger_callbacks(event=CallbackEvent.POST_BACKWARD)
        if should_sync:
            self.gradient_clip()
            self.trigger_callbacks(event=CallbackEvent.PRE_OPTIMIZER_STEP)
            optimizer.step()
            self.trigger_callbacks(event=CallbackEvent.POST_OPTIMIZER_STEP)
            optimizer.zero_grad()

        if scheduler is not None:
            scheduler.step()

        result["loss"] = loss.item()
        result["lr"] = scheduler.get_last_lr()[0]
        result["global_step"] = self.training_state.current_global_step

        # Only log on local main process
        if self._can_log(flag="steps") and self.accelerator.is_local_main_process and logger is not None:
            logger.info(
                f"[Training] Epoch {self.training_state.current_epoch}, Step {self.training_state.current_step_in_epoch}, Loss {loss.item()}"
            )
            wandb.log(result, step=self.training_state.current_global_step)

        self.trigger_callbacks(event=CallbackEvent.STEP_END, batch=batch)
        return result

    def save(self) -> None:
        self.trigger_callbacks(event=CallbackEvent.PRE_SAVE)
        if not self.accelerator.is_main_process:
            return

        save_dir = self.training_config.save_dir
        if save_dir is None or save_dir == "":
            return
        os.makedirs(save_dir, exist_ok=True)

        epoch = self.training_state.current_epoch
        step = self.training_state.current_step_in_epoch
        global_step = self.training_state.current_global_step
        checkpoint_name = self._get_checkpoint_name(epoch, step, global_step)
        temp_checkpoint_dir = os.path.join(save_dir, f"tmp_{checkpoint_name}")
        final_checkpoint_dir = os.path.join(save_dir, checkpoint_name)
        if os.path.exists(final_checkpoint_dir):
            return

        os.makedirs(temp_checkpoint_dir, exist_ok=True)
        try:
            # Save accelerator state (this includes model, optimizer, and scheduler)
            self.accelerator.save_state(temp_checkpoint_dir)

            # Save training metadata separately
            for base_name in [
                "training_state",
                "training_config",
                "log_config",
            ]:
                file_path = os.path.join(temp_checkpoint_dir, f"{base_name}.pth")
                torch.save(getattr(self, base_name), file_path)

            os.rename(temp_checkpoint_dir, final_checkpoint_dir)
            self._cleanup_old_checkpoints(save_dir=save_dir)
            if self.accelerator.is_local_main_process and self.logger is not None:
                self.logger.info(f"Model saved to {final_checkpoint_dir}")

        except Exception as e:
            if self.accelerator.is_local_main_process and self.logger is not None:
                self.logger.error(f"Failed to save checkpoint: {e}")
            shutil.rmtree(temp_checkpoint_dir, ignore_errors=True)
            raise
        self.trigger_callbacks(event=CallbackEvent.POST_SAVE)

    def load(self) -> None:
        self.trigger_callbacks(event=CallbackEvent.PRE_LOAD)
        # check load condition ============================================================================
        checkpoint_dir = self.get_latest_checkpoint_dir()
        if checkpoint_dir is None or len(os.listdir(checkpoint_dir)) <= 0:
            return

        # load ============================================================================================
        self.accelerator.load_state(checkpoint_dir, load_kwargs={"weights_only": False})

        # Load training metadata
        for base_name in ["training_state", "training_config", "log_config"]:
            file_path = os.path.join(checkpoint_dir, f"{base_name}.pth")
            if not os.path.exists(file_path):
                if self.accelerator.is_main_process and self.logger is not None:
                    self.logger.info(f"File {file_path} does not exist")
                continue
            setattr(self, base_name, torch.load(file_path, weights_only=False))

        if self.accelerator.is_main_process and self.logger is not None:
            self.logger.info(f"Model loaded from {checkpoint_dir}")
        self.trigger_callbacks(event=CallbackEvent.POST_LOAD)

    def trigger_callbacks(self, event: CallbackEvent, **kwargs):
        """Trigger all callbacks for a given event

        Args:
            event (CallbackEvent): the event to trigger
            **kwargs: the keyword arguments to pass to the callback
        """
        super().trigger_callbacks(
            event=event,
            accelerator=self.accelerator,
            **kwargs,
        )

__init__(model, dataset, optimizers, training_config, log_config, logger, collate_fn=None, seed=42, callbacks=None, **kwargs)

Initialize the DistributedPipeline

Parameters:

Name Type Description Default
model Module

the model to train

required
dataset Union[Dataset, Dataset]

the dataset to train on

required
optimizers Tuple[Optimizer, LambdaLR]

the optimizers to use for training

required
training_config DistributedTrainingConfig

the training configuration

required
log_config LogConfig

the logging configuration

required
logger Logger

the logger to use for logging

required
collate_fn Optional[Callable]

the collate function to use for the dataset. Defaults to None.

None
seed int

the seed to use for the random number generator. Defaults to 42.

42
Source code in src/ls_mlkit/pipeline/distributed_pipeline.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(
    self,
    model: torch.nn.Module,
    dataset: Union[torch.utils.data.Dataset, datasets.Dataset],
    optimizers: Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LRScheduler],
    training_config: DistributedTrainingConfig,
    log_config: LogConfig,
    logger: logging.Logger | None,
    collate_fn: Optional[Callable] = None,
    seed: int = 42,
    callbacks: Optional[List[BaseCallback]] = None,
    **kwargs,
):
    """Initialize the DistributedPipeline

    Args:
        model (torch.nn.Module): the model to train
        dataset (Union[torch.utils.data.Dataset, datasets.Dataset]): the dataset to train on
        optimizers (Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR]): the optimizers to use for training
        training_config (DistributedTrainingConfig): the training configuration
        log_config (LogConfig): the logging configuration
        logger (logging.Logger): the logger to use for logging
        collate_fn (Optional[Callable], optional): the collate function to use for the dataset. Defaults to None.
        seed (int, optional): the seed to use for the random number generator. Defaults to 42.
    """
    accelerate.utils.set_seed(seed)
    self.accelerator = Accelerator(
        gradient_accumulation_steps=training_config.gradient_accumulation_steps,
        mixed_precision=training_config.mixed_precision,
        kwargs_handlers=[
            DistributedDataParallelKwargs(find_unused_parameters=training_config.find_unused_parameters),
        ],
    )

    super().__init__(
        model=model,
        dataset=dataset,
        optimizers=optimizers,
        training_config=training_config,
        log_config=log_config,
        collate_fn=collate_fn,
        logger=logger,
        callbacks=callbacks,
        load_checkpoint=False,
        **kwargs,
    )

    # Prepare everything for distributed training BEFORE loading
    self.model, self.optimizer, self.dataloader, self.scheduler = self.accelerator.prepare(
        self.model, self.optimizer, self.dataloader, self.scheduler
    )

    # Load checkpoint AFTER prepare to ensure proper state restoration
    if self.training_config.save_dir is not None and self.training_config.save_dir != "":
        self.load()
        self.training_config = training_config

    if self.accelerator.is_local_main_process:
        assert self.logger is not None, f"Error from {self.__class__.__name__}: logger is required"
        self.logger.info("Using distributed training with accelerate")
        self.logger.info(f"Number of processes: {self.accelerator.num_processes}")
        self.logger.info(f"Current device: {self.accelerator.device}")

trigger_callbacks(event, **kwargs)

Trigger all callbacks for a given event

Parameters:

Name Type Description Default
event CallbackEvent

the event to trigger

required
**kwargs

the keyword arguments to pass to the callback

{}
Source code in src/ls_mlkit/pipeline/distributed_pipeline.py
285
286
287
288
289
290
291
292
293
294
295
296
def trigger_callbacks(self, event: CallbackEvent, **kwargs):
    """Trigger all callbacks for a given event

    Args:
        event (CallbackEvent): the event to trigger
        **kwargs: the keyword arguments to pass to the callback
    """
    super().trigger_callbacks(
        event=event,
        accelerator=self.accelerator,
        **kwargs,
    )

DistributedTrainingConfig

Bases: TrainingConfig

Source code in src/ls_mlkit/pipeline/distributed_pipeline.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@inherit_docstrings
class DistributedTrainingConfig(TrainingConfig):
    def __init__(
        self,
        n_epochs: int = 100,
        batch_size: int = 4,
        device: str = "cuda",
        save_strategy: Literal["epochs", "steps", None] = "epochs",
        save_dir: str | None = None,
        save_steps: int = 10,
        save_epochs: int = 1,
        save_total_limit: int = 5,
        num_workers: int = 4,
        train_shuffle: bool = True,
        eval_strategy: Literal["epochs", "steps"] | None = None,
        eval_steps: int = 500,
        eval_epochs: int = 1,
        grad_clip_strategy: Literal["norm", "value", None] = "norm",
        max_grad_norm: float = 1.0,
        max_grad_value: float = 1.0,
        gradient_accumulation_steps: int = 1,
        mixed_precision: str = "fp16",
        find_unused_parameters: bool = False,
        **kwargs,
    ):
        """Initialize the DistributedTrainingConfig

        Args:
            n_epochs (int, optional): the number of epochs. Defaults to 100.
            batch_size (int, optional): the batch size. Defaults to 4.
            device (str, optional): the device to use for training. Defaults to "cuda".
            save_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;, None], optional): the strategy determines whether to save the model and when to save it. Defaults to "epochs".
            save_dir (str, optional): the directory to save the model. Defaults to None.
            save_steps (int, optional): the number of steps to save the model. Defaults to 10.
            save_epochs (int, optional): the number of epochs to save the model. Defaults to 1.
            save_total_limit (int, optional): the maximum number of checkpoints to save. Defaults to 5.
            num_workers (int, optional): the number of workers to use for data loading. Defaults to 4.
            train_shuffle (bool, optional): whether to shuffle the training data. Defaults to True.
            eval_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;], optional): the strategy determines whether to evaluate the model and when to evaluate it. Defaults to None.
            eval_steps (int, optional): the number of steps to evaluate the model. Defaults to 500.
            eval_epochs (int, optional): the number of epochs to evaluate the model. Defaults to 1.
            grad_clip_strategy (Literal[&quot;norm&quot;, &quot;value&quot;, None], optional): the strategy determines whether to clip the gradient and how to clip it. Defaults to "norm".
            max_grad_norm (float, optional): the maximum gradient norm to clip the gradient. Defaults to 1.0.
            max_grad_value (float, optional): the maximum gradient value to clip the gradient. Defaults to 1.0.
            gradient_accumulation_steps (int, optional): the number of steps to accumulate gradients before updating the model. Defaults to 1.
            mixed_precision (str, optional): the mixed precision to use for training. Defaults to "fp16".
        """

        super().__init__(
            n_epochs=n_epochs,
            batch_size=batch_size,
            device=device,
            save_strategy=save_strategy,
            save_dir=save_dir,
            save_steps=save_steps,
            save_epochs=save_epochs,
            save_total_limit=save_total_limit,
            num_workers=num_workers,
            train_shuffle=train_shuffle,
            eval_strategy=eval_strategy,
            eval_steps=eval_steps,
            eval_epochs=eval_epochs,
            grad_clip_strategy=grad_clip_strategy,
            max_grad_norm=max_grad_norm,
            max_grad_value=max_grad_value,
            gradient_accumulation_steps=gradient_accumulation_steps,
            **kwargs,
        )
        self.mixed_precision = mixed_precision
        self.find_unused_parameters = find_unused_parameters

__init__(n_epochs=100, batch_size=4, device='cuda', save_strategy='epochs', save_dir=None, save_steps=10, save_epochs=1, save_total_limit=5, num_workers=4, train_shuffle=True, eval_strategy=None, eval_steps=500, eval_epochs=1, grad_clip_strategy='norm', max_grad_norm=1.0, max_grad_value=1.0, gradient_accumulation_steps=1, mixed_precision='fp16', find_unused_parameters=False, **kwargs)

Initialize the DistributedTrainingConfig

Parameters:

Name Type Description Default
n_epochs int

the number of epochs. Defaults to 100.

100
batch_size int

the batch size. Defaults to 4.

4
device str

the device to use for training. Defaults to "cuda".

'cuda'
save_strategy Literal[&quot;epochs&quot;, &quot;steps&quot;, None]

the strategy determines whether to save the model and when to save it. Defaults to "epochs".

'epochs'
save_dir str

the directory to save the model. Defaults to None.

None
save_steps int

the number of steps to save the model. Defaults to 10.

10
save_epochs int

the number of epochs to save the model. Defaults to 1.

1
save_total_limit int

the maximum number of checkpoints to save. Defaults to 5.

5
num_workers int

the number of workers to use for data loading. Defaults to 4.

4
train_shuffle bool

whether to shuffle the training data. Defaults to True.

True
eval_strategy Literal[&quot;epochs&quot;, &quot;steps&quot;]

the strategy determines whether to evaluate the model and when to evaluate it. Defaults to None.

None
eval_steps int

the number of steps to evaluate the model. Defaults to 500.

500
eval_epochs int

the number of epochs to evaluate the model. Defaults to 1.

1
grad_clip_strategy Literal[&quot;norm&quot;, &quot;value&quot;, None]

the strategy determines whether to clip the gradient and how to clip it. Defaults to "norm".

'norm'
max_grad_norm float

the maximum gradient norm to clip the gradient. Defaults to 1.0.

1.0
max_grad_value float

the maximum gradient value to clip the gradient. Defaults to 1.0.

1.0
gradient_accumulation_steps int

the number of steps to accumulate gradients before updating the model. Defaults to 1.

1
mixed_precision str

the mixed precision to use for training. Defaults to "fp16".

'fp16'
Source code in src/ls_mlkit/pipeline/distributed_pipeline.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(
    self,
    n_epochs: int = 100,
    batch_size: int = 4,
    device: str = "cuda",
    save_strategy: Literal["epochs", "steps", None] = "epochs",
    save_dir: str | None = None,
    save_steps: int = 10,
    save_epochs: int = 1,
    save_total_limit: int = 5,
    num_workers: int = 4,
    train_shuffle: bool = True,
    eval_strategy: Literal["epochs", "steps"] | None = None,
    eval_steps: int = 500,
    eval_epochs: int = 1,
    grad_clip_strategy: Literal["norm", "value", None] = "norm",
    max_grad_norm: float = 1.0,
    max_grad_value: float = 1.0,
    gradient_accumulation_steps: int = 1,
    mixed_precision: str = "fp16",
    find_unused_parameters: bool = False,
    **kwargs,
):
    """Initialize the DistributedTrainingConfig

    Args:
        n_epochs (int, optional): the number of epochs. Defaults to 100.
        batch_size (int, optional): the batch size. Defaults to 4.
        device (str, optional): the device to use for training. Defaults to "cuda".
        save_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;, None], optional): the strategy determines whether to save the model and when to save it. Defaults to "epochs".
        save_dir (str, optional): the directory to save the model. Defaults to None.
        save_steps (int, optional): the number of steps to save the model. Defaults to 10.
        save_epochs (int, optional): the number of epochs to save the model. Defaults to 1.
        save_total_limit (int, optional): the maximum number of checkpoints to save. Defaults to 5.
        num_workers (int, optional): the number of workers to use for data loading. Defaults to 4.
        train_shuffle (bool, optional): whether to shuffle the training data. Defaults to True.
        eval_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;], optional): the strategy determines whether to evaluate the model and when to evaluate it. Defaults to None.
        eval_steps (int, optional): the number of steps to evaluate the model. Defaults to 500.
        eval_epochs (int, optional): the number of epochs to evaluate the model. Defaults to 1.
        grad_clip_strategy (Literal[&quot;norm&quot;, &quot;value&quot;, None], optional): the strategy determines whether to clip the gradient and how to clip it. Defaults to "norm".
        max_grad_norm (float, optional): the maximum gradient norm to clip the gradient. Defaults to 1.0.
        max_grad_value (float, optional): the maximum gradient value to clip the gradient. Defaults to 1.0.
        gradient_accumulation_steps (int, optional): the number of steps to accumulate gradients before updating the model. Defaults to 1.
        mixed_precision (str, optional): the mixed precision to use for training. Defaults to "fp16".
    """

    super().__init__(
        n_epochs=n_epochs,
        batch_size=batch_size,
        device=device,
        save_strategy=save_strategy,
        save_dir=save_dir,
        save_steps=save_steps,
        save_epochs=save_epochs,
        save_total_limit=save_total_limit,
        num_workers=num_workers,
        train_shuffle=train_shuffle,
        eval_strategy=eval_strategy,
        eval_steps=eval_steps,
        eval_epochs=eval_epochs,
        grad_clip_strategy=grad_clip_strategy,
        max_grad_norm=max_grad_norm,
        max_grad_value=max_grad_value,
        gradient_accumulation_steps=gradient_accumulation_steps,
        **kwargs,
    )
    self.mixed_precision = mixed_precision
    self.find_unused_parameters = find_unused_parameters

BasePipeline

Source code in src/ls_mlkit/pipeline/pipeline.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
class BasePipeline(metaclass=ABCMeta):
    def __init__(
        self,
        model: torch.nn.Module,
        dataset: Union[torch.utils.data.Dataset, datasets.Dataset],
        optimizers: Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LRScheduler],
        training_config: TrainingConfig,
        log_config: LogConfig,
        logger: logging.Logger | None,
        collate_fn: Optional[Callable] = None,
        callbacks: Optional[List[BaseCallback]] = None,
        load_checkpoint: bool = True,
        *args,
        **kwargs,
    ):
        """Initialize the BasePipeline

        Args:
            model (torch.nn.Module): the model to train
            dataset (Union[torch.utils.data.Dataset, datasets.Dataset]): the dataset to train on
            optimizers (Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LRScheduler]): the optimizers to use for training
            training_config (TrainingConfig): the training configuration
            log_config (LogConfig): the logging configuration
            logger (logging.Logger | None): the logger to use for logging
            collate_fn (Optional[Callable], optional): the collate function to use for the dataset. Defaults to None.
        """
        super().__init__()
        # callbacks
        self.callback_manager = CallbackManager()
        self.callback_manager.add_callbacks(callbacks=callbacks or [])

        # model, dataset, optimizers, log, training,
        self.model = model
        self.dataset = dataset
        self.optimizer, self.scheduler = optimizers
        self.training_config = training_config
        self.log_config = log_config
        self.training_state = TrainingState()
        self.logger = logger

        self.dataloader = torch.utils.data.DataLoader(
            cast(torch.utils.data.Dataset, self.dataset),
            batch_size=self.training_config.batch_size,
            shuffle=self.training_config.train_shuffle,
            collate_fn=collate_fn,
            num_workers=self.training_config.num_workers,
        )

        if load_checkpoint and self.training_config.save_dir is not None and self.training_config.save_dir != "":
            self.load()
            self.training_config = training_config

    @abstractmethod
    def compute_loss(self, model: torch.nn.Module, batch: dict) -> Tensor | dict:
        """Compute the loss

        Args:
            model (torch.nn.Module): the model to train
            batch (dict): the batch of data

        Returns:
            Tensor | dict: loss Tensor or a dictionary containing the loss key
        """

    def train(self) -> None:
        """Train the model

        Returns:
            None
        """
        self.trigger_callbacks(event=CallbackEvent.TRAINING_START)
        n_epochs = self.training_config.n_epochs

        for epoch in tqdm(range(n_epochs), desc="training", mininterval=0):
            if epoch < self.training_state.current_epoch:
                continue
            self.train_an_epoch()
            self.training_state.current_epoch += 1

            if self._can_eval(flag="epochs"):
                self.eval()
            if self._can_save(flag="epochs"):
                self.save()
        self.save()
        self.trigger_callbacks(event=CallbackEvent.TRAINING_END)

    def train_an_epoch(self):
        """Train the model for one epoch

        Returns:
            None
        """
        self.trigger_callbacks(event=CallbackEvent.EPOCH_START)
        for step, batch in enumerate(self.dataloader):
            if step < self.training_state.current_step_in_epoch:
                continue
            result = self.train_a_step(batch)

            if self._can_eval(flag="steps"):
                self.eval()
            if self._can_save(flag="steps"):
                self.save()
            self.training_state.current_step_in_epoch += 1
            self.training_state.current_global_step += 1

        if self._can_log(flag="epochs") and self.logger is not None:
            self.logger.info(
                f"[Training] Epoch {self.training_state.current_epoch}, Step {self.training_state.current_step_in_epoch}, Loss {result['loss']}"
            )
            wandb.log(result, step=self.training_state.current_epoch)
        self.training_state.current_step_in_epoch = 0
        self.trigger_callbacks(event=CallbackEvent.EPOCH_END)

    def train_a_step(self, batch: Dict[str, Any]) -> dict:
        """Train the model for one step

        Args:
            batch (Dict[str, Any]): the batch of data

        Returns:
            dict: a dictionary containing the loss key
        """
        self.trigger_callbacks(event=CallbackEvent.STEP_START, batch=batch)
        model = self.model
        optimizer = self.optimizer
        scheduler = self.scheduler
        logger = self.logger
        device = self.training_config.device
        model.train()
        for key, value in batch.items():
            if type(value) is torch.Tensor:
                batch[key] = value.to(device)
        model = model.to(device)

        raw_loss = self.compute_loss(model, batch)
        if isinstance(raw_loss, Tensor):
            loss = raw_loss
        else:
            loss = cast(Tensor, cast(Dict[str, Any], raw_loss)["loss"])
        loss = loss / self.training_config.gradient_accumulation_steps
        self.trigger_callbacks(event=CallbackEvent.PRE_BACKWARD, loss=loss)
        loss.backward()
        self.trigger_callbacks(event=CallbackEvent.POST_BACKWARD, loss=loss)

        if ((self.training_state.current_global_step + 1) % self.training_config.gradient_accumulation_steps) == 0:
            self.gradient_clip()
            self.trigger_callbacks(event=CallbackEvent.PRE_OPTIMIZER_STEP)
            optimizer.step()
            self.trigger_callbacks(event=CallbackEvent.POST_OPTIMIZER_STEP)
            optimizer.zero_grad()

        if scheduler is not None:
            scheduler.step()

        if self._can_log(flag="steps") and logger is not None:
            logger.info(
                f"[Training] Epoch {self.training_state.current_epoch}, Step {self.training_state.current_step_in_epoch}, Loss {loss.item()}"
            )
        self.trigger_callbacks(event=CallbackEvent.STEP_END, batch=batch)
        return {
            "loss": loss.item(),
        }

    def gradient_clip(self) -> None:
        """Clip the gradient

        Returns:
            None
        """
        model = self.model
        if self.training_config.grad_clip_strategy == "norm":
            torch.nn.utils.clip_grad_norm_(
                model.parameters(),
                max_norm=self.training_config.max_grad_norm,
                norm_type=2,
                error_if_nonfinite=False,
            )
        if self.training_config.grad_clip_strategy == "value":
            torch.nn.utils.clip_grad_value_(
                model.parameters(),
                clip_value=self.training_config.max_grad_value,
            )

    def eval(self):
        """Evaluate the model"""

    def _cleanup_old_checkpoints(self, save_dir):
        checkpoints = [d for d in os.listdir(save_dir) if d.startswith("checkpoint_")]

        def _checkpoint_sort_key(name: str) -> int:
            m = re.search(r"global(\d+)", name)
            return int(m.group(1)) if m is not None else 0

        checkpoints.sort(key=_checkpoint_sort_key)

        while len(checkpoints) > self.training_config.save_total_limit:
            oldest_checkpoint = checkpoints.pop(0)
            oldest_path = os.path.join(save_dir, oldest_checkpoint)
            shutil.rmtree(oldest_path)
            if self.logger is not None:
                self.logger.info(f"Deleted old checkpoint: {oldest_path}")

    def _get_checkpoint_name(self, epoch, step, global_step):
        return f"checkpoint_epoch{epoch}_step{step}_global{global_step}"

    def _can_save(self, flag: Literal["epochs", "steps"]):
        if (
            self.training_config.save_strategy is None
            or self.training_config.save_dir is None
            or self.training_config.save_dir == ""
        ):
            return False
        if flag == "epochs":
            return (self.training_state.current_epoch + 1) % self.training_config.save_epochs == 0
        elif flag == "steps":
            return (self.training_state.current_global_step + 1) % self.training_config.save_steps == 0
        else:
            return False

    def _can_log(self, flag: Literal["epochs", "steps"]):
        if self.logger is None:
            return False
        if self.log_config.log_strategy is None or self.log_config.log_dir is None or self.log_config.log_dir == "":
            return False
        if flag == "epochs":
            return (self.training_state.current_epoch + 1) % self.log_config.log_epochs == 0
        elif flag == "steps":
            return (self.training_state.current_global_step + 1) % self.log_config.log_steps == 0
        else:
            return False

    def _can_eval(self, flag: Literal["epochs", "steps"]):
        if self.training_config.eval_strategy is None:
            return False
        if flag == "epochs":
            return (self.training_state.current_epoch + 1) % self.training_config.eval_epochs == 0
        elif flag == "steps":
            return (self.training_state.current_global_step + 1) % self.training_config.eval_steps == 0
        else:
            return False

    def save(self) -> None:
        """Save the checkpoint

        Returns:
            None
        """
        self.trigger_callbacks(event=CallbackEvent.PRE_SAVE)
        save_dir = self.training_config.save_dir
        if save_dir is None or save_dir == "":
            return
        os.makedirs(save_dir, exist_ok=True)

        epoch = self.training_state.current_epoch
        step = self.training_state.current_step_in_epoch
        global_step = self.training_state.current_global_step
        checkpoint_name = self._get_checkpoint_name(epoch, step, global_step)
        temp_checkpoint_dir = os.path.join(save_dir, f"tmp_{checkpoint_name}")
        final_checkpoint_dir = os.path.join(save_dir, checkpoint_name)
        if os.path.exists(final_checkpoint_dir):
            return

        os.makedirs(temp_checkpoint_dir, exist_ok=True)
        try:
            for base_name in [
                "model",
                "optimizer",
                "scheduler",
                "training_state",
                "training_config",
                "log_config",
            ]:
                file_path = os.path.join(temp_checkpoint_dir, f"{base_name}.pth")
                torch.save(getattr(self, base_name), file_path)
            os.rename(temp_checkpoint_dir, final_checkpoint_dir)
            self._cleanup_old_checkpoints(save_dir=save_dir)
            if self.logger is not None:
                self.logger.info(f"Model saved to {final_checkpoint_dir}")

        except Exception as e:
            if self.logger is not None:
                self.logger.error(f"Failed to save checkpoint: {e}")
            shutil.rmtree(temp_checkpoint_dir, ignore_errors=True)
            raise
        self.trigger_callbacks(event=CallbackEvent.POST_SAVE)

    def load(self) -> None:
        """Load the checkpoint

        Returns:
            None
        """
        self.trigger_callbacks(event=CallbackEvent.PRE_LOAD)
        # check load condition ============================================================================
        checkpoint_dir = self.get_latest_checkpoint_dir()
        if checkpoint_dir is None or len(os.listdir(checkpoint_dir)) <= 0:
            return

        # load ============================================================================================

        for base_name in [
            "model",
            "optimizer",
            "scheduler",
            "training_state",
            "training_config",
            "log_config",
        ]:
            file_path = os.path.join(checkpoint_dir, f"{base_name}.pth")
            if not os.path.exists(file_path):
                if self.logger is not None:
                    self.logger.info(f"File {file_path} does not exist")
                continue
            setattr(self, base_name, torch.load(file_path, weights_only=False))

        if self.logger is not None:
            self.logger.info(f"Model loaded from {checkpoint_dir}")
        self.trigger_callbacks(event=CallbackEvent.POST_LOAD)

    def get_latest_checkpoint_dir(self) -> str | None:
        save_dir = self.training_config.save_dir
        if save_dir is None or save_dir == "" or not os.path.exists(save_dir) or len(os.listdir(save_dir)) <= 0:
            return None

        def _checkpoint_sort_key(name: str) -> int:
            m = re.search(r"global(\d+)", name)
            return int(m.group(1)) if m is not None else 0

        checkpoints = [d for d in os.listdir(save_dir) if d.startswith("checkpoint_")]
        checkpoints.sort(key=_checkpoint_sort_key, reverse=True)
        if len(checkpoints) <= 0:
            return None
        checkpoint_dir = checkpoints.pop(0)
        checkpoint_dir = os.path.join(save_dir, checkpoint_dir)
        return checkpoint_dir

    def add_callbacks(self, callbacks: List[BaseCallback] | None):
        """Add a list of callbacks

        Args:
            callbacks (List[BaseCallback] | None): the callbacks to add
        """
        self.callback_manager.add_callbacks(callbacks=callbacks)

    def trigger_callbacks(self, event: CallbackEvent, **kwargs):
        """Trigger all callbacks for a given event

        Args:
            event (CallbackEvent): the event to trigger
            **kwargs: the keyword arguments to pass to the callback
        """
        self.callback_manager.trigger(
            event=event,
            training_config=self.training_config,
            log_config=self.log_config,
            training_state=self.training_state,
            logger=self.logger,
            model=self.model,
            dataloader=self.dataloader,
            optimizer=self.optimizer,
            scheduler=self.scheduler,
            **kwargs,
        )

__init__(model, dataset, optimizers, training_config, log_config, logger, collate_fn=None, callbacks=None, load_checkpoint=True, *args, **kwargs)

Initialize the BasePipeline

Parameters:

Name Type Description Default
model Module

the model to train

required
dataset Union[Dataset, Dataset]

the dataset to train on

required
optimizers Tuple[Optimizer, LRScheduler]

the optimizers to use for training

required
training_config TrainingConfig

the training configuration

required
log_config LogConfig

the logging configuration

required
logger Logger | None

the logger to use for logging

required
collate_fn Optional[Callable]

the collate function to use for the dataset. Defaults to None.

None
Source code in src/ls_mlkit/pipeline/pipeline.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def __init__(
    self,
    model: torch.nn.Module,
    dataset: Union[torch.utils.data.Dataset, datasets.Dataset],
    optimizers: Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LRScheduler],
    training_config: TrainingConfig,
    log_config: LogConfig,
    logger: logging.Logger | None,
    collate_fn: Optional[Callable] = None,
    callbacks: Optional[List[BaseCallback]] = None,
    load_checkpoint: bool = True,
    *args,
    **kwargs,
):
    """Initialize the BasePipeline

    Args:
        model (torch.nn.Module): the model to train
        dataset (Union[torch.utils.data.Dataset, datasets.Dataset]): the dataset to train on
        optimizers (Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LRScheduler]): the optimizers to use for training
        training_config (TrainingConfig): the training configuration
        log_config (LogConfig): the logging configuration
        logger (logging.Logger | None): the logger to use for logging
        collate_fn (Optional[Callable], optional): the collate function to use for the dataset. Defaults to None.
    """
    super().__init__()
    # callbacks
    self.callback_manager = CallbackManager()
    self.callback_manager.add_callbacks(callbacks=callbacks or [])

    # model, dataset, optimizers, log, training,
    self.model = model
    self.dataset = dataset
    self.optimizer, self.scheduler = optimizers
    self.training_config = training_config
    self.log_config = log_config
    self.training_state = TrainingState()
    self.logger = logger

    self.dataloader = torch.utils.data.DataLoader(
        cast(torch.utils.data.Dataset, self.dataset),
        batch_size=self.training_config.batch_size,
        shuffle=self.training_config.train_shuffle,
        collate_fn=collate_fn,
        num_workers=self.training_config.num_workers,
    )

    if load_checkpoint and self.training_config.save_dir is not None and self.training_config.save_dir != "":
        self.load()
        self.training_config = training_config

compute_loss(model, batch) abstractmethod

Compute the loss

Parameters:

Name Type Description Default
model Module

the model to train

required
batch dict

the batch of data

required

Returns:

Type Description
Tensor | dict

Tensor | dict: loss Tensor or a dictionary containing the loss key

Source code in src/ls_mlkit/pipeline/pipeline.py
178
179
180
181
182
183
184
185
186
187
188
@abstractmethod
def compute_loss(self, model: torch.nn.Module, batch: dict) -> Tensor | dict:
    """Compute the loss

    Args:
        model (torch.nn.Module): the model to train
        batch (dict): the batch of data

    Returns:
        Tensor | dict: loss Tensor or a dictionary containing the loss key
    """

train()

Train the model

Returns:

Type Description
None

None

Source code in src/ls_mlkit/pipeline/pipeline.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def train(self) -> None:
    """Train the model

    Returns:
        None
    """
    self.trigger_callbacks(event=CallbackEvent.TRAINING_START)
    n_epochs = self.training_config.n_epochs

    for epoch in tqdm(range(n_epochs), desc="training", mininterval=0):
        if epoch < self.training_state.current_epoch:
            continue
        self.train_an_epoch()
        self.training_state.current_epoch += 1

        if self._can_eval(flag="epochs"):
            self.eval()
        if self._can_save(flag="epochs"):
            self.save()
    self.save()
    self.trigger_callbacks(event=CallbackEvent.TRAINING_END)

train_an_epoch()

Train the model for one epoch

Returns:

Type Description

None

Source code in src/ls_mlkit/pipeline/pipeline.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def train_an_epoch(self):
    """Train the model for one epoch

    Returns:
        None
    """
    self.trigger_callbacks(event=CallbackEvent.EPOCH_START)
    for step, batch in enumerate(self.dataloader):
        if step < self.training_state.current_step_in_epoch:
            continue
        result = self.train_a_step(batch)

        if self._can_eval(flag="steps"):
            self.eval()
        if self._can_save(flag="steps"):
            self.save()
        self.training_state.current_step_in_epoch += 1
        self.training_state.current_global_step += 1

    if self._can_log(flag="epochs") and self.logger is not None:
        self.logger.info(
            f"[Training] Epoch {self.training_state.current_epoch}, Step {self.training_state.current_step_in_epoch}, Loss {result['loss']}"
        )
        wandb.log(result, step=self.training_state.current_epoch)
    self.training_state.current_step_in_epoch = 0
    self.trigger_callbacks(event=CallbackEvent.EPOCH_END)

train_a_step(batch)

Train the model for one step

Parameters:

Name Type Description Default
batch Dict[str, Any]

the batch of data

required

Returns:

Name Type Description
dict dict

a dictionary containing the loss key

Source code in src/ls_mlkit/pipeline/pipeline.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def train_a_step(self, batch: Dict[str, Any]) -> dict:
    """Train the model for one step

    Args:
        batch (Dict[str, Any]): the batch of data

    Returns:
        dict: a dictionary containing the loss key
    """
    self.trigger_callbacks(event=CallbackEvent.STEP_START, batch=batch)
    model = self.model
    optimizer = self.optimizer
    scheduler = self.scheduler
    logger = self.logger
    device = self.training_config.device
    model.train()
    for key, value in batch.items():
        if type(value) is torch.Tensor:
            batch[key] = value.to(device)
    model = model.to(device)

    raw_loss = self.compute_loss(model, batch)
    if isinstance(raw_loss, Tensor):
        loss = raw_loss
    else:
        loss = cast(Tensor, cast(Dict[str, Any], raw_loss)["loss"])
    loss = loss / self.training_config.gradient_accumulation_steps
    self.trigger_callbacks(event=CallbackEvent.PRE_BACKWARD, loss=loss)
    loss.backward()
    self.trigger_callbacks(event=CallbackEvent.POST_BACKWARD, loss=loss)

    if ((self.training_state.current_global_step + 1) % self.training_config.gradient_accumulation_steps) == 0:
        self.gradient_clip()
        self.trigger_callbacks(event=CallbackEvent.PRE_OPTIMIZER_STEP)
        optimizer.step()
        self.trigger_callbacks(event=CallbackEvent.POST_OPTIMIZER_STEP)
        optimizer.zero_grad()

    if scheduler is not None:
        scheduler.step()

    if self._can_log(flag="steps") and logger is not None:
        logger.info(
            f"[Training] Epoch {self.training_state.current_epoch}, Step {self.training_state.current_step_in_epoch}, Loss {loss.item()}"
        )
    self.trigger_callbacks(event=CallbackEvent.STEP_END, batch=batch)
    return {
        "loss": loss.item(),
    }

gradient_clip()

Clip the gradient

Returns:

Type Description
None

None

Source code in src/ls_mlkit/pipeline/pipeline.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def gradient_clip(self) -> None:
    """Clip the gradient

    Returns:
        None
    """
    model = self.model
    if self.training_config.grad_clip_strategy == "norm":
        torch.nn.utils.clip_grad_norm_(
            model.parameters(),
            max_norm=self.training_config.max_grad_norm,
            norm_type=2,
            error_if_nonfinite=False,
        )
    if self.training_config.grad_clip_strategy == "value":
        torch.nn.utils.clip_grad_value_(
            model.parameters(),
            clip_value=self.training_config.max_grad_value,
        )

eval()

Evaluate the model

Source code in src/ls_mlkit/pipeline/pipeline.py
309
310
def eval(self):
    """Evaluate the model"""

save()

Save the checkpoint

Returns:

Type Description
None

None

Source code in src/ls_mlkit/pipeline/pipeline.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def save(self) -> None:
    """Save the checkpoint

    Returns:
        None
    """
    self.trigger_callbacks(event=CallbackEvent.PRE_SAVE)
    save_dir = self.training_config.save_dir
    if save_dir is None or save_dir == "":
        return
    os.makedirs(save_dir, exist_ok=True)

    epoch = self.training_state.current_epoch
    step = self.training_state.current_step_in_epoch
    global_step = self.training_state.current_global_step
    checkpoint_name = self._get_checkpoint_name(epoch, step, global_step)
    temp_checkpoint_dir = os.path.join(save_dir, f"tmp_{checkpoint_name}")
    final_checkpoint_dir = os.path.join(save_dir, checkpoint_name)
    if os.path.exists(final_checkpoint_dir):
        return

    os.makedirs(temp_checkpoint_dir, exist_ok=True)
    try:
        for base_name in [
            "model",
            "optimizer",
            "scheduler",
            "training_state",
            "training_config",
            "log_config",
        ]:
            file_path = os.path.join(temp_checkpoint_dir, f"{base_name}.pth")
            torch.save(getattr(self, base_name), file_path)
        os.rename(temp_checkpoint_dir, final_checkpoint_dir)
        self._cleanup_old_checkpoints(save_dir=save_dir)
        if self.logger is not None:
            self.logger.info(f"Model saved to {final_checkpoint_dir}")

    except Exception as e:
        if self.logger is not None:
            self.logger.error(f"Failed to save checkpoint: {e}")
        shutil.rmtree(temp_checkpoint_dir, ignore_errors=True)
        raise
    self.trigger_callbacks(event=CallbackEvent.POST_SAVE)

load()

Load the checkpoint

Returns:

Type Description
None

None

Source code in src/ls_mlkit/pipeline/pipeline.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def load(self) -> None:
    """Load the checkpoint

    Returns:
        None
    """
    self.trigger_callbacks(event=CallbackEvent.PRE_LOAD)
    # check load condition ============================================================================
    checkpoint_dir = self.get_latest_checkpoint_dir()
    if checkpoint_dir is None or len(os.listdir(checkpoint_dir)) <= 0:
        return

    # load ============================================================================================

    for base_name in [
        "model",
        "optimizer",
        "scheduler",
        "training_state",
        "training_config",
        "log_config",
    ]:
        file_path = os.path.join(checkpoint_dir, f"{base_name}.pth")
        if not os.path.exists(file_path):
            if self.logger is not None:
                self.logger.info(f"File {file_path} does not exist")
            continue
        setattr(self, base_name, torch.load(file_path, weights_only=False))

    if self.logger is not None:
        self.logger.info(f"Model loaded from {checkpoint_dir}")
    self.trigger_callbacks(event=CallbackEvent.POST_LOAD)

add_callbacks(callbacks)

Add a list of callbacks

Parameters:

Name Type Description Default
callbacks List[BaseCallback] | None

the callbacks to add

required
Source code in src/ls_mlkit/pipeline/pipeline.py
462
463
464
465
466
467
468
def add_callbacks(self, callbacks: List[BaseCallback] | None):
    """Add a list of callbacks

    Args:
        callbacks (List[BaseCallback] | None): the callbacks to add
    """
    self.callback_manager.add_callbacks(callbacks=callbacks)

trigger_callbacks(event, **kwargs)

Trigger all callbacks for a given event

Parameters:

Name Type Description Default
event CallbackEvent

the event to trigger

required
**kwargs

the keyword arguments to pass to the callback

{}
Source code in src/ls_mlkit/pipeline/pipeline.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def trigger_callbacks(self, event: CallbackEvent, **kwargs):
    """Trigger all callbacks for a given event

    Args:
        event (CallbackEvent): the event to trigger
        **kwargs: the keyword arguments to pass to the callback
    """
    self.callback_manager.trigger(
        event=event,
        training_config=self.training_config,
        log_config=self.log_config,
        training_state=self.training_state,
        logger=self.logger,
        model=self.model,
        dataloader=self.dataloader,
        optimizer=self.optimizer,
        scheduler=self.scheduler,
        **kwargs,
    )

LogConfig

Source code in src/ls_mlkit/pipeline/pipeline.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class LogConfig:
    def __init__(
        self,
        log_dir: str = "logs",
        log_steps: int = 100,
        log_epochs: int = 1,
        log_strategy: Literal["epochs", "steps"] = "epochs",
        *args,
        **kwargs,
    ):
        """Initialize the LogConfig

        Args:
            log_dir (str, optional): directory to save the logs. Defaults to "logs".
            log_steps (int, optional): log the metrics every n steps. Defaults to 100.
            log_epochs (int, optional): log the metrics every n epochs. Defaults to 1.
            log_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;], optional): log strategy determines whether to log the metrics and when to log them. Defaults to "epochs".
        """
        self.log_dir = log_dir
        self.log_steps = log_steps
        self.log_epochs = log_epochs
        self.log_strategy = log_strategy

__init__(log_dir='logs', log_steps=100, log_epochs=1, log_strategy='epochs', *args, **kwargs)

Initialize the LogConfig

Parameters:

Name Type Description Default
log_dir str

directory to save the logs. Defaults to "logs".

'logs'
log_steps int

log the metrics every n steps. Defaults to 100.

100
log_epochs int

log the metrics every n epochs. Defaults to 1.

1
log_strategy Literal[&quot;epochs&quot;, &quot;steps&quot;]

log strategy determines whether to log the metrics and when to log them. Defaults to "epochs".

'epochs'
Source code in src/ls_mlkit/pipeline/pipeline.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(
    self,
    log_dir: str = "logs",
    log_steps: int = 100,
    log_epochs: int = 1,
    log_strategy: Literal["epochs", "steps"] = "epochs",
    *args,
    **kwargs,
):
    """Initialize the LogConfig

    Args:
        log_dir (str, optional): directory to save the logs. Defaults to "logs".
        log_steps (int, optional): log the metrics every n steps. Defaults to 100.
        log_epochs (int, optional): log the metrics every n epochs. Defaults to 1.
        log_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;], optional): log strategy determines whether to log the metrics and when to log them. Defaults to "epochs".
    """
    self.log_dir = log_dir
    self.log_steps = log_steps
    self.log_epochs = log_epochs
    self.log_strategy = log_strategy

TrainingConfig

Source code in src/ls_mlkit/pipeline/pipeline.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class TrainingConfig:
    def __init__(
        self,
        n_epochs: int = 100,
        batch_size: int = 4,
        device: str = "cuda",
        save_strategy: Literal["epochs", "steps", None] = "epochs",
        save_dir: str | None = None,
        save_steps: int = 10,
        save_epochs: int = 1,
        save_total_limit: int = 5,
        num_workers: int = 4,
        train_shuffle: bool = True,
        eval_strategy: Literal["epochs", "steps"] | None = None,
        eval_steps: int = 500,
        eval_epochs: int = 1,
        grad_clip_strategy: Literal["norm", "value", None] = "norm",
        max_grad_norm: float = 1.0,
        max_grad_value: float = 1.0,
        gradient_accumulation_steps: int = 1,
        *args,
        **kwargs,
    ):
        """Initialize the TrainingConfig

        Args:
            n_epochs (int, optional): the number of epochs
            batch_size (int, optional): batch size. Defaults to 4.
            device (str, optional): device in training. Defaults to "cuda".
            save_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;, None], optional): save strategy determines whether to save the model and when to save it. Defaults to "epochs".
            save_dir (str, optional): directory to save the model. Defaults to None.
            save_steps (int, optional): save the model every n steps. Defaults to 10.
            save_epochs (int, optional): save the model every n epochs. Defaults to 1.
            save_total_limit (int, optional): maximum number of checkpoints to save. Defaults to 5.
            num_workers (int, optional): number of workers to use for data loading. Defaults to 4.
            train_shuffle (bool, optional): whether to shuffle the training data. Defaults to True.
            eval_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;], optional): evaluation strategy determines whether to evaluate the model and when to evaluate it. Defaults to None.
            eval_steps (int, optional): evaluate the model every n steps. Defaults to 500.
            eval_epochs (int, optional): evaluate the model every n epochs. Defaults to 1.
            grad_clip_strategy (Literal[&quot;norm&quot;, &quot;value&quot;, None], optional): gradient clip strategy determines whether to clip the gradient and how to clip it. Defaults to "norm".
            max_grad_norm (float, optional): maximum gradient norm. Defaults to 1.0.
            max_grad_value (float, optional): maximum gradient value. Defaults to 1.0.
            gradient_accumulation_steps (int, optional): number of steps to accumulate gradients before updating the model. Defaults to 1.

        Returns:
            None
        """
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.device = device
        self.save_strategy = save_strategy
        self.save_dir = save_dir
        self.save_steps = save_steps
        self.save_epochs = save_epochs
        self.num_workers = num_workers
        self.train_shuffle = train_shuffle
        self.save_total_limit = save_total_limit
        self.eval_strategy = eval_strategy
        self.eval_steps = eval_steps
        self.eval_epochs = eval_epochs
        self.grad_clip_strategy = grad_clip_strategy
        self.max_grad_norm = max_grad_norm
        self.max_grad_value = max_grad_value
        self.gradient_accumulation_steps = gradient_accumulation_steps

__init__(n_epochs=100, batch_size=4, device='cuda', save_strategy='epochs', save_dir=None, save_steps=10, save_epochs=1, save_total_limit=5, num_workers=4, train_shuffle=True, eval_strategy=None, eval_steps=500, eval_epochs=1, grad_clip_strategy='norm', max_grad_norm=1.0, max_grad_value=1.0, gradient_accumulation_steps=1, *args, **kwargs)

Initialize the TrainingConfig

Parameters:

Name Type Description Default
n_epochs int

the number of epochs

100
batch_size int

batch size. Defaults to 4.

4
device str

device in training. Defaults to "cuda".

'cuda'
save_strategy Literal[&quot;epochs&quot;, &quot;steps&quot;, None]

save strategy determines whether to save the model and when to save it. Defaults to "epochs".

'epochs'
save_dir str

directory to save the model. Defaults to None.

None
save_steps int

save the model every n steps. Defaults to 10.

10
save_epochs int

save the model every n epochs. Defaults to 1.

1
save_total_limit int

maximum number of checkpoints to save. Defaults to 5.

5
num_workers int

number of workers to use for data loading. Defaults to 4.

4
train_shuffle bool

whether to shuffle the training data. Defaults to True.

True
eval_strategy Literal[&quot;epochs&quot;, &quot;steps&quot;]

evaluation strategy determines whether to evaluate the model and when to evaluate it. Defaults to None.

None
eval_steps int

evaluate the model every n steps. Defaults to 500.

500
eval_epochs int

evaluate the model every n epochs. Defaults to 1.

1
grad_clip_strategy Literal[&quot;norm&quot;, &quot;value&quot;, None]

gradient clip strategy determines whether to clip the gradient and how to clip it. Defaults to "norm".

'norm'
max_grad_norm float

maximum gradient norm. Defaults to 1.0.

1.0
max_grad_value float

maximum gradient value. Defaults to 1.0.

1.0
gradient_accumulation_steps int

number of steps to accumulate gradients before updating the model. Defaults to 1.

1

Returns:

Type Description

None

Source code in src/ls_mlkit/pipeline/pipeline.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    n_epochs: int = 100,
    batch_size: int = 4,
    device: str = "cuda",
    save_strategy: Literal["epochs", "steps", None] = "epochs",
    save_dir: str | None = None,
    save_steps: int = 10,
    save_epochs: int = 1,
    save_total_limit: int = 5,
    num_workers: int = 4,
    train_shuffle: bool = True,
    eval_strategy: Literal["epochs", "steps"] | None = None,
    eval_steps: int = 500,
    eval_epochs: int = 1,
    grad_clip_strategy: Literal["norm", "value", None] = "norm",
    max_grad_norm: float = 1.0,
    max_grad_value: float = 1.0,
    gradient_accumulation_steps: int = 1,
    *args,
    **kwargs,
):
    """Initialize the TrainingConfig

    Args:
        n_epochs (int, optional): the number of epochs
        batch_size (int, optional): batch size. Defaults to 4.
        device (str, optional): device in training. Defaults to "cuda".
        save_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;, None], optional): save strategy determines whether to save the model and when to save it. Defaults to "epochs".
        save_dir (str, optional): directory to save the model. Defaults to None.
        save_steps (int, optional): save the model every n steps. Defaults to 10.
        save_epochs (int, optional): save the model every n epochs. Defaults to 1.
        save_total_limit (int, optional): maximum number of checkpoints to save. Defaults to 5.
        num_workers (int, optional): number of workers to use for data loading. Defaults to 4.
        train_shuffle (bool, optional): whether to shuffle the training data. Defaults to True.
        eval_strategy (Literal[&quot;epochs&quot;, &quot;steps&quot;], optional): evaluation strategy determines whether to evaluate the model and when to evaluate it. Defaults to None.
        eval_steps (int, optional): evaluate the model every n steps. Defaults to 500.
        eval_epochs (int, optional): evaluate the model every n epochs. Defaults to 1.
        grad_clip_strategy (Literal[&quot;norm&quot;, &quot;value&quot;, None], optional): gradient clip strategy determines whether to clip the gradient and how to clip it. Defaults to "norm".
        max_grad_norm (float, optional): maximum gradient norm. Defaults to 1.0.
        max_grad_value (float, optional): maximum gradient value. Defaults to 1.0.
        gradient_accumulation_steps (int, optional): number of steps to accumulate gradients before updating the model. Defaults to 1.

    Returns:
        None
    """
    self.n_epochs = n_epochs
    self.batch_size = batch_size
    self.device = device
    self.save_strategy = save_strategy
    self.save_dir = save_dir
    self.save_steps = save_steps
    self.save_epochs = save_epochs
    self.num_workers = num_workers
    self.train_shuffle = train_shuffle
    self.save_total_limit = save_total_limit
    self.eval_strategy = eval_strategy
    self.eval_steps = eval_steps
    self.eval_epochs = eval_epochs
    self.grad_clip_strategy = grad_clip_strategy
    self.max_grad_norm = max_grad_norm
    self.max_grad_value = max_grad_value
    self.gradient_accumulation_steps = gradient_accumulation_steps

TrainingState

Source code in src/ls_mlkit/pipeline/pipeline.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class TrainingState:
    def __init__(
        self,
        current_epoch: int = 0,
        current_step_in_epoch: int = 0,
        current_global_step: int = 0,
    ):
        """Initialize the TrainingState

        Args:
            current_epoch (int, optional): the current epoch. Defaults to 0.
            current_step_in_epoch (int, optional): the current step in the epoch. Defaults to 0.
            current_global_step (int, optional): the current global step. Defaults to 0.
        """
        self.current_epoch = current_epoch
        self.current_step_in_epoch = current_step_in_epoch
        self.current_global_step = current_global_step

__init__(current_epoch=0, current_step_in_epoch=0, current_global_step=0)

Initialize the TrainingState

Parameters:

Name Type Description Default
current_epoch int

the current epoch. Defaults to 0.

0
current_step_in_epoch int

the current step in the epoch. Defaults to 0.

0
current_global_step int

the current global step. Defaults to 0.

0
Source code in src/ls_mlkit/pipeline/pipeline.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    current_epoch: int = 0,
    current_step_in_epoch: int = 0,
    current_global_step: int = 0,
):
    """Initialize the TrainingState

    Args:
        current_epoch (int, optional): the current epoch. Defaults to 0.
        current_step_in_epoch (int, optional): the current step in the epoch. Defaults to 0.
        current_global_step (int, optional): the current global step. Defaults to 0.
    """
    self.current_epoch = current_epoch
    self.current_step_in_epoch = current_step_in_epoch
    self.current_global_step = current_global_step