Skip to content

Utils

Functions:

calculate_advantage(row)

Calculate advantage values for a row of data.

Parameters:

  • row (dict) –

    Dictionary containing rewards and statistics with keys:

    • rewards: List of reward values
    • reward_mean: Mean reward value
    • reward_std: Standard deviation of rewards

Returns:

  • list[float]

    List of advantage values calculated as (reward - mean)/(std + eps) where eps=1e-4 is added for numerical stability

Source code in tapeagents/finetune/rl/utils.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def calculate_advantage(row):
    """
    Calculate advantage values for a row of data.

    Args:
        row (dict): Dictionary containing rewards and statistics with keys:

            - rewards: List of reward values
            - reward_mean: Mean reward value
            - reward_std: Standard deviation of rewards

    Returns:
       (list[float]): List of advantage values calculated as (reward - mean)/(std + eps)
            where eps=1e-4 is added for numerical stability
    """
    rewards = row["rewards"]
    mean = row["reward_mean"]
    std = row["reward_std"]
    return [(reward - mean) / (np.nan_to_num(std) + 1e-4) for reward in rewards]

calculate_reward_with_implicit_kl(row, reward_minus_kl_coef)

Calculate reward with implicit KL penalty.

Parameters:

  • row (dict) –

    Dictionary containing reward and log probability data with keys:

    • reward: Base reward value
    • old_logprobs: Log probabilities from old policy
    • ref_logprobs: Reference log probabilities
  • reward_minus_kl_coef (float) –

    Coefficient for implicit KL penalty term

Returns:

  • float

    Reward value adjusted by implicit KL penalty, calculated as: reward - reward_minus_kl_coef * KL(ref||old) The KL divergence is approximated using the Schulman approximation: KL ≈ exp(log_ratio) - log_ratio - 1 where log_ratio = ref_logprobs - old_logprobs

Source code in tapeagents/finetune/rl/utils.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def calculate_reward_with_implicit_kl(row, reward_minus_kl_coef):
    """
    Calculate reward with implicit KL penalty.

    Args:
        row (dict): Dictionary containing reward and log probability data with keys:

            - reward: Base reward value
            - old_logprobs: Log probabilities from old policy
            - ref_logprobs: Reference log probabilities
        reward_minus_kl_coef (float): Coefficient for implicit KL penalty term

    Returns:
        (float): Reward value adjusted by implicit KL penalty, calculated as:
            reward - reward_minus_kl_coef * KL(ref||old)
            The KL divergence is approximated using the Schulman approximation:
            KL ≈ exp(log_ratio) - log_ratio - 1
            where log_ratio = ref_logprobs - old_logprobs
    """
    reward = row["reward"]
    old_logprobs = row["old_logprobs"]
    ref_logprobs = row["ref_logprobs"]
    log_ratio_ref_old = ref_logprobs - old_logprobs
    kl = (np.exp(log_ratio_ref_old) - log_ratio_ref_old - 1).sum()  # Schulman KL approx
    return reward - reward_minus_kl_coef * kl

masked_mean(values, mask, axis=None)

Compute mean of tensor with a masked values.

Source code in tapeagents/finetune/rl/utils.py
41
42
43
44
45
46
def masked_mean(values: torch.Tensor, mask: torch.Tensor, axis: Optional[bool] = None) -> torch.Tensor:
    """Compute mean of tensor with a masked values."""
    if axis is not None:
        return (values * mask).sum(axis=axis) / mask.sum(axis=axis)  # type: ignore
    else:
        return (values * mask).sum() / mask.sum()

masked_sum(values, mask, axis=None)

Compute sum of tensor with a masked values.

Source code in tapeagents/finetune/rl/utils.py
33
34
35
36
37
38
def masked_sum(values: torch.Tensor, mask: torch.Tensor, axis: Optional[bool] = None) -> torch.Tensor:
    """Compute sum of tensor with a masked values."""
    if axis is not None:
        return (values * mask).sum(axis=axis)  # type: ignore
    else:
        return (values * mask).sum()

replace_dataset_column(dataset, column_name, new_column)

Replace a column in the dataset with a new column.

Source code in tapeagents/finetune/rl/utils.py
 97
 98
 99
100
101
102
103
104
105
def replace_dataset_column(dataset: Dataset, column_name: str, new_column: List[List[float]]) -> Dataset:
    """
    Replace a column in the dataset with a new column.
    """
    if column_name in dataset.features:
        dataset = dataset.map(remove_columns=[column_name])
    dataset = dataset.add_column(name=column_name, column=new_column)  # type: ignore

    return dataset