Skip to content

Commit b4c075d

Browse files
564612540facebook-github-bot
authored andcommitted
Disk (#706)
Summary: ## Types of changes - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [x] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Docs change / refactoring / dependency upgrade ## Motivation and Context / Related issue It introduces a set of new optimizers called DiSK, which uses a simplified Kalman filter to improve optimizer performance. ## How Has This Been Tested (if it applies) It is tested with the mnist.py from the example folder (with modifications for DiSK) to ensure all the functions work. ## Checklist Not sure whether to add documents. - [ ] The documentation is up-to-date with the changes I made. - [x] I have read the **CONTRIBUTING** document and completed the CLA (see **CONTRIBUTING**). - [x] All tests passed, and additional code has been covered with new tests. Pull Request resolved: #706 Reviewed By: HuanyuZhang Differential Revision: D67626897 Pulled By: iden-kalemaj fbshipit-source-id: 3ac3caf5212920afdae7b4a8ef71bd3868073731
1 parent 4b0cd91 commit b4c075d

File tree

10 files changed

+1004
-0
lines changed

10 files changed

+1004
-0
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from typing import List, Union
2+
3+
from opacus.optimizers import DPOptimizer
4+
from opacus.privacy_engine import PrivacyEngine
5+
from torch import optim
6+
7+
from .optimizers import KF_DPOptimizer, get_optimizer_class
8+
9+
10+
class KF_PrivacyEngine(PrivacyEngine):
11+
def __init__(self, *, accountant: str = "prv", secure_mode: bool = False):
12+
super().__init__(accountant=accountant, secure_mode=secure_mode)
13+
14+
def _prepare_optimizer(
15+
self,
16+
*,
17+
optimizer: optim.Optimizer,
18+
noise_multiplier: float,
19+
max_grad_norm: Union[float, List[float]],
20+
expected_batch_size: int,
21+
loss_reduction: str = "mean",
22+
distributed: bool = False,
23+
clipping: str = "flat",
24+
noise_generator=None,
25+
grad_sample_mode="hooks",
26+
kalman: bool = False,
27+
**kwargs,
28+
) -> DPOptimizer:
29+
if kalman and isinstance(optimizer, KF_DPOptimizer):
30+
optimizer = optimizer.original_optimizer
31+
elif not kalman and isinstance(optimizer, DPOptimizer):
32+
optimizer = optimizer.original_optimizer
33+
34+
generator = None
35+
if self.secure_mode:
36+
generator = self.secure_rng
37+
elif noise_generator is not None:
38+
generator = noise_generator
39+
40+
optim_class = get_optimizer_class(
41+
clipping=clipping,
42+
distributed=distributed,
43+
grad_sample_mode=grad_sample_mode,
44+
kalman=kalman,
45+
)
46+
47+
return optim_class(
48+
optimizer=optimizer,
49+
noise_multiplier=noise_multiplier,
50+
max_grad_norm=max_grad_norm,
51+
expected_batch_size=expected_batch_size,
52+
loss_reduction=loss_reduction,
53+
generator=generator,
54+
secure_mode=self.secure_mode,
55+
**kwargs,
56+
)

research/disk_optimizer/ReadMe.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# DiSK: Differentially Private Optimizer with Simplified Kalman Filter for Noise Reduction
2+
3+
## Introduction
4+
This part of the code introduces a new component to the optimizer named DiSK. The code uses a simplifed Kalman to improve the privatized gradient estimate. Speficially, the privatized minibatch gradient is replaced with:
5+
6+
$$\mathbb{g_{t+\frac{1}{2}}} = \frac{1}{B}\sum_{\xi \in \mathcal{B}_t} \mathrm{clip}_C\left(\frac{1-\kappa}{\kappa\gamma}\nabla f(x_t + \gamma(x_t-x_{t-1});\xi) + \Big(1- \frac{1-\kappa}{\kappa\gamma}\Big)\nabla f(x_t;\xi)\right) + w_t$$
7+
$$g_{t}= (1-\kappa)g_{t-1} + \kappa g_{t+\frac{1}{2}}$$
8+
9+
A detailed description of the algorithm can be found at [Here](https://arxiv.org/abs/2410.03883).
10+
11+
## Usage
12+
The code provides a modified privacy engine with three extra arguments:
13+
* kamlan: bool=False
14+
* kappa: float=0.7
15+
* gamma: float=0.5
16+
17+
To use DiSK, follow the steps:
18+
19+
**Step I:** Import KF_PrivacyEngine from KFprivacy_engine.py and set ```kalman=True```
20+
21+
**Step II:** Define a closure (see [here](https://pytorch.org/docs/stable/optim.html#optimizer-step-closure) for example) to compute loss and backward **without** ```zero_grad()``` and perform ```optimizer.step(closure)```
22+
23+
Example of using the DiSK optimizers:
24+
25+
```python
26+
from KFprivacy_engine import KF_PrivacyEngine
27+
# ...
28+
# follow the same steps as original opacus training scripts
29+
privacy_engine = KF_PrivacyEngine()
30+
model, optimizer, train_loader = privacy_engine.make_private(
31+
module=model,
32+
optimizer=optimizer,
33+
data_loader=train_loader,
34+
noise_multiplier=args.sigma,
35+
max_grad_norm=max_grad_norm,
36+
clipping=clipping,
37+
grad_sample_mode=args.grad_sample_mode,
38+
kalman=True, # need this argument
39+
kappa=0.7, # optional
40+
gamma=0.5 # optional
41+
)
42+
43+
# ...
44+
# during training:
45+
def closure(): # compute loss and backward, an example adapting the one used in examples/cifar10.py
46+
output = model(images)
47+
loss = criterion(output, target)
48+
loss.backward()
49+
return output, loss
50+
output, loss = optimizer.step(closure)
51+
optimizer.zero_grad()
52+
# compute other matrices
53+
# ...
54+
```
55+
56+
## Citation
57+
Consider citing the paper is you use DiSK in your papers, as follows:
58+
59+
```
60+
@article{zhang2024disk,
61+
title={{DiSK}: Differentially private optimizer with simplified kalman filter for noise reduction},
62+
author={Zhang, Xinwei and Bu, Zhiqi and Balle, Borja and Hong, Mingyi and Razaviyayn, Meisam and Mirrokni, Vahab},
63+
journal={arXiv preprint arXiv:2410.03883},
64+
year={2024}
65+
}
66+
```
67+
68+
Contributer: Xinwei Zhang. Email: [zhan6234@umn.edu](mailto:zhan6234@umn.edu)
69+
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import math
5+
from typing import Optional
6+
7+
import torch
8+
from opacus.optimizers.adaclipoptimizer import AdaClipDPOptimizer
9+
from torch.optim import Optimizer
10+
from torch.optim.optimizer import required
11+
12+
from .KFoptimizer import KF_DPOptimizer
13+
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class KF_AdaClipDPOptimizer(AdaClipDPOptimizer, KF_DPOptimizer):
19+
def __init__(
20+
self,
21+
optimizer: Optimizer,
22+
*,
23+
noise_multiplier: float,
24+
target_unclipped_quantile: float,
25+
clipbound_learning_rate: float,
26+
max_clipbound: float,
27+
min_clipbound: float,
28+
unclipped_num_std: float,
29+
max_grad_norm: float,
30+
expected_batch_size: Optional[int],
31+
loss_reduction: str = "mean",
32+
generator=None,
33+
secure_mode: bool = False,
34+
kappa: float = 0.7,
35+
gamma: float = 0.5,
36+
):
37+
if gamma == 0 or abs(gamma - (1 - kappa) / kappa) < 1e-3:
38+
gamma = (1 - kappa) / kappa
39+
self.kf_compute_grad_at_original = False
40+
else:
41+
self.scaling_factor = (1 - kappa) / (
42+
gamma * kappa
43+
) # (gamma*kappa+kappa-1)/(1-kappa)
44+
self.kf_compute_grad_at_original = True
45+
c = (1 - kappa) / (gamma * kappa)
46+
norm_factor = math.sqrt(c**2 + (1 - c) ** 2)
47+
noise_multiplier = noise_multiplier / norm_factor
48+
super(AdaClipDPOptimizer).__init__(
49+
optimizer,
50+
noise_multiplier=noise_multiplier,
51+
max_grad_norm=max_grad_norm,
52+
expected_batch_size=expected_batch_size,
53+
loss_reduction=loss_reduction,
54+
generator=generator,
55+
secure_mode=secure_mode,
56+
target_unclipped_quantile=target_unclipped_quantile,
57+
clipbound_learning_rate=clipbound_learning_rate,
58+
max_clipbound=max_clipbound,
59+
min_clipbound=min_clipbound,
60+
unclipped_num_std=unclipped_num_std,
61+
)
62+
self.kappa = kappa
63+
self.gamma = gamma
64+
65+
def step(self, closure=required) -> Optional[float]:
66+
if self.kf_compute_grad_at_original:
67+
loss = self._compute_two_closure(closure)
68+
else:
69+
loss = self._compute_one_closure(closure)
70+
71+
if self.pre_step():
72+
tmp_states = []
73+
first_step = False
74+
for p in self.params:
75+
grad = p.grad
76+
state = self.state[p]
77+
if "kf_d_t" not in state:
78+
state = dict()
79+
first_step = True
80+
state["kf_d_t"] = torch.zeros_like(p.data).to(p.data)
81+
state["kf_m_t"] = grad.clone().to(p.data)
82+
state["kf_m_t"].lerp_(grad, weight=self.kappa)
83+
p.grad = state["kf_m_t"].clone().to(p.data)
84+
state["kf_d_t"] = -p.data.clone().to(p.data)
85+
if first_step:
86+
tmp_states.append(state)
87+
self.original_optimizer.step()
88+
for p in self.params:
89+
if first_step:
90+
tmp_state = tmp_states.pop(0)
91+
self.state[p]["kf_d_t"] = tmp_state["kf_d_t"]
92+
self.state[p]["kf_m_t"] = tmp_state["kf_m_t"]
93+
del tmp_state
94+
self.state[p]["kf_d_t"].add_(p.data, alpha=1.0)
95+
return loss
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
from __future__ import annotations
2+
3+
from functools import partial
4+
from typing import Callable, List, Optional
5+
6+
import torch
7+
from opacus.optimizers.ddp_perlayeroptimizer import _clip_and_accumulate_parameter
8+
from opacus.optimizers.optimizer import _generate_noise
9+
from torch import nn
10+
from torch.optim import Optimizer
11+
12+
from .KFddpoptimizer import KF_DistributedDPOptimizer
13+
from .KFoptimizer import KF_DPOptimizer
14+
from .KFperlayeroptimizer import KF_DPPerLayerOptimizer
15+
16+
17+
class KF_SimpleDistributedPerLayerOptimizer(
18+
KF_DPPerLayerOptimizer, KF_DistributedDPOptimizer
19+
):
20+
def __init__(
21+
self,
22+
optimizer: Optimizer,
23+
*,
24+
noise_multiplier: float,
25+
max_grad_norm: float,
26+
expected_batch_size: Optional[int],
27+
loss_reduction: str = "mean",
28+
generator=None,
29+
secure_mode: bool = False,
30+
kappa: float = 0.7,
31+
gamma: float = 0.5,
32+
):
33+
self.rank = torch.distributed.get_rank()
34+
self.world_size = torch.distributed.get_world_size()
35+
36+
super().__init__(
37+
optimizer,
38+
noise_multiplier=noise_multiplier,
39+
max_grad_norm=max_grad_norm,
40+
expected_batch_size=expected_batch_size,
41+
loss_reduction=loss_reduction,
42+
generator=generator,
43+
secure_mode=secure_mode,
44+
kappa=kappa,
45+
gamma=gamma,
46+
)
47+
48+
49+
class KF_DistributedPerLayerOptimizer(KF_DPOptimizer):
50+
"""
51+
:class:`~opacus.optimizers.optimizer.DPOptimizer` that implements
52+
per layer clipping strategy and is compatible with distributed data parallel
53+
"""
54+
55+
def __init__(
56+
self,
57+
optimizer: Optimizer,
58+
*,
59+
noise_multiplier: float,
60+
max_grad_norm: List[float],
61+
expected_batch_size: Optional[int],
62+
loss_reduction: str = "mean",
63+
generator=None,
64+
secure_mode: bool = False,
65+
kappa: float = 0.7,
66+
gamma: float = 0.5,
67+
):
68+
self.rank = torch.distributed.get_rank()
69+
self.world_size = torch.distributed.get_world_size()
70+
self.max_grad_norms = max_grad_norm
71+
max_grad_norm = torch.norm(torch.Tensor(self.max_grad_norms), p=2).item()
72+
super().__init__(
73+
optimizer,
74+
noise_multiplier=noise_multiplier,
75+
max_grad_norm=max_grad_norm,
76+
expected_batch_size=expected_batch_size,
77+
loss_reduction=loss_reduction,
78+
generator=generator,
79+
secure_mode=secure_mode,
80+
kappa=kappa,
81+
gamma=gamma,
82+
)
83+
self._register_hooks()
84+
85+
def _add_noise_parameter(self, p: nn.Parameter):
86+
"""
87+
The reason why we need self is because of generator for secure_mode
88+
"""
89+
noise = _generate_noise(
90+
std=self.noise_multiplier * self.max_grad_norm,
91+
reference=p.summed_grad,
92+
generator=None,
93+
secure_mode=self.secure_mode,
94+
)
95+
p.grad = p.summed_grad + noise
96+
97+
@property
98+
def accumulated_iterations(self) -> int:
99+
return max([p.accumulated_iterations for p in self.params])
100+
101+
def _scale_grad_parameter(self, p: nn.Parameter):
102+
if not hasattr(p, "accumulated_iterations"):
103+
p.accumulated_iterations = 0
104+
p.accumulated_iterations += 1
105+
if self.loss_reduction == "mean":
106+
p.grad /= (
107+
self.expected_batch_size * p.accumulated_iterations * self.world_size
108+
)
109+
110+
def clip_and_accumulate(self):
111+
raise NotImplementedError(
112+
"Clip and accumulate is added per layer in DPDDP Per Layer."
113+
)
114+
115+
def add_noise(self):
116+
raise NotImplementedError("Noise is added per layer in DPDDP Per Layer.")
117+
118+
def pre_step(
119+
self, closure: Optional[Callable[[], float]] = None
120+
) -> Optional[float]:
121+
if self._check_skip_next_step():
122+
self._is_last_step_skipped = True
123+
return False
124+
125+
if self.step_hook:
126+
self.step_hook(self)
127+
128+
for p in self.params:
129+
p.accumulated_iterations = 0
130+
131+
self._is_last_step_skipped = False
132+
return True
133+
134+
def _ddp_per_layer_hook(
135+
self, p: nn.Parameter, max_grad_norm: float, _: torch.Tensor
136+
):
137+
_clip_and_accumulate_parameter(p, max_grad_norm)
138+
# Equivalent ot _check_skip_next_step but without popping because it has to be done for every parameter p
139+
if self._check_skip_next_step(pop_next=False):
140+
return
141+
142+
if self.rank == 0:
143+
self._add_noise_parameter(p)
144+
else:
145+
p.grad = p.summed_grad
146+
self._scale_grad_parameter(p)
147+
148+
return p.grad
149+
150+
def _register_hooks(self):
151+
for p, max_grad_norm in zip(self.params, self.max_grad_norms):
152+
if not p.requires_grad:
153+
continue
154+
155+
if not hasattr(p, "ddp_hooks"):
156+
p.ddp_hooks = []
157+
158+
p.ddp_hooks.append(
159+
p.register_hook(partial(self._ddp_per_layer_hook, p, max_grad_norm))
160+
)

0 commit comments

Comments
 (0)