Skip to content

Commit d41bf27

Browse files
authored
feat: non-blocking rate limiting on refresh. (#1574)
This further improve the efficient use of futures in the refresh by using the ScheduledExecutor's delay function to rate limit instead of blocking the ScheduleExecutor's execution thread.
1 parent 9780562 commit d41bf27

File tree

7 files changed

+258
-45
lines changed

7 files changed

+258
-45
lines changed

core/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@
143143
<version>1.1.5</version>
144144
<scope>test</scope>
145145
</dependency>
146+
<!-- Using this for the deterministic scheduler class only -->
147+
<dependency>
148+
<groupId>org.jmock</groupId>
149+
<artifactId>jmock</artifactId>
150+
<version>2.12.0</version>
151+
<scope>test</scope>
152+
</dependency>
146153

147154
<dependency>
148155
<groupId>com.github.jnr</groupId>
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.sql.core;
18+
19+
import com.google.common.util.concurrent.Futures;
20+
import com.google.common.util.concurrent.ListenableFuture;
21+
import java.util.concurrent.ScheduledExecutorService;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.function.LongSupplier;
24+
25+
/**
26+
* A simple, constant-time rate limit calculator. Ensures that there is always at least
27+
* delayBetweenAttempts milliseconds between attempts.
28+
*/
29+
class AsyncRateLimiter {
30+
private long nextOperationTimestamp;
31+
private final long delayBetweenAttempts;
32+
private final LongSupplier currentTimestampMs;
33+
34+
/**
35+
* Creates a new AsyncRateLimiter uses the System.currentTimeMillis() as the current time.
36+
*
37+
* @param delayBetweenAttempts the required delay in milliseconds between attempts.
38+
*/
39+
AsyncRateLimiter(long delayBetweenAttempts) {
40+
this(delayBetweenAttempts, System::currentTimeMillis);
41+
}
42+
43+
/**
44+
* Creates a new AsyncRateLimiter which uses a custom function for the current time.
45+
*
46+
* @param delayBetweenAttempts the required delay in milliseconds between attempts.
47+
* @param currentTimestampMs A function that supplies the current time in milliseconds
48+
*/
49+
AsyncRateLimiter(long delayBetweenAttempts, LongSupplier currentTimestampMs) {
50+
this.delayBetweenAttempts = delayBetweenAttempts;
51+
this.currentTimestampMs = currentTimestampMs;
52+
}
53+
54+
/**
55+
* Returns the number of milliseconds to delay before proceeding with the rate limited operation.
56+
* If this returns > 0, the operation must call "acquire" again until it returns 0.
57+
*/
58+
private synchronized long nextDelayMs(long nowTimestampMs) {
59+
// allow exactly 1 operation to pass the timestamp.
60+
if (nextOperationTimestamp <= nowTimestampMs) {
61+
nextOperationTimestamp = nowTimestampMs + delayBetweenAttempts;
62+
return 0;
63+
}
64+
65+
return nextOperationTimestamp - nowTimestampMs;
66+
}
67+
68+
/**
69+
* Returns a future that will be done when the rate limit has been acquired.
70+
*
71+
* @param executor the executor to use to schedule future checks for available rate limits.
72+
*/
73+
public ListenableFuture<?> acquireAsync(ScheduledExecutorService executor) {
74+
long limit = this.nextDelayMs(currentTimestampMs.getAsLong());
75+
if (limit > 0) {
76+
return Futures.scheduleAsync(
77+
() -> this.acquireAsync(executor), limit, TimeUnit.MILLISECONDS, executor);
78+
}
79+
return Futures.immediateFuture(null);
80+
}
81+
}

core/src/main/java/com/google/cloud/sql/core/CloudSqlInstance.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.common.util.concurrent.Futures;
2424
import com.google.common.util.concurrent.ListenableFuture;
2525
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
26-
import com.google.common.util.concurrent.RateLimiter;
2726
import com.google.errorprone.annotations.concurrent.GuardedBy;
2827
import java.io.IOException;
2928
import java.security.KeyPair;
@@ -56,9 +55,7 @@ class CloudSqlInstance {
5655
private final CloudSqlInstanceName instanceName;
5756
private final ListenableFuture<KeyPair> keyPair;
5857
private final Object instanceDataGuard = new Object();
59-
60-
@SuppressWarnings("UnstableApiUsage")
61-
private final RateLimiter forcedRenewRateLimiter;
58+
private final AsyncRateLimiter rateLimiter;
6259

6360
private final RefreshCalculator refreshCalculator = new RefreshCalculator();
6461

@@ -89,13 +86,15 @@ class CloudSqlInstance {
8986
CredentialFactory tokenSourceFactory,
9087
ListeningScheduledExecutorService executor,
9188
ListenableFuture<KeyPair> keyPair,
92-
@SuppressWarnings("UnstableApiUsage") RateLimiter forcedRenewRateLimiter) {
89+
long minRefreshDelayMs) {
9390
this.instanceName = new CloudSqlInstanceName(connectionName);
9491
this.instanceDataSupplier = instanceDataSupplier;
9592
this.authType = authType;
9693
this.executor = executor;
9794
this.keyPair = keyPair;
98-
this.forcedRenewRateLimiter = forcedRenewRateLimiter;
95+
96+
// convert requests/second into milliseconds between requests.
97+
this.rateLimiter = new AsyncRateLimiter(minRefreshDelayMs);
9998

10099
if (authType == AuthType.IAM) {
101100
this.accessTokenSupplier = new DefaultAccessTokenSupplier(tokenSourceFactory);
@@ -223,25 +222,19 @@ private ListenableFuture<InstanceData> startRefreshAttempt() {
223222
refreshRunning = true;
224223
}
225224

226-
// To avoid unreasonable SQL Admin API usage, use a rate limit to throttle our usage.
227-
ListenableFuture<?> rateLimit =
228-
executor.submit(
229-
() -> {
230-
logger.fine(
231-
String.format(
232-
"[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
233-
//noinspection UnstableApiUsage
234-
forcedRenewRateLimiter.acquire();
235-
logger.fine(
236-
String.format(
237-
"[%s] Refresh Operation: Acquired rate limiter permit. Starting refresh...",
238-
instanceName));
239-
},
240-
executor);
225+
logger.fine(
226+
String.format("[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
227+
ListenableFuture<?> delay = rateLimiter.acquireAsync(executor);
228+
delay.addListener(
229+
() ->
230+
logger.fine(
231+
String.format(
232+
"[%s] Refresh Operation: Rate limiter permit acquired.", instanceName)),
233+
executor);
241234

242235
// Once rate limiter is done, attempt to getInstanceData.
243236
ListenableFuture<InstanceData> dataFuture =
244-
Futures.whenAllComplete(rateLimit)
237+
Futures.whenAllComplete(delay)
245238
.callAsync(
246239
() ->
247240
instanceDataSupplier.getInstanceData(

core/src/main/java/com/google/cloud/sql/core/CoreSocketFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.common.util.concurrent.ListenableFuture;
2626
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
2727
import com.google.common.util.concurrent.MoreExecutors;
28-
import com.google.common.util.concurrent.RateLimiter;
2928
import java.io.File;
3029
import java.io.IOException;
3130
import java.net.InetSocketAddress;
@@ -108,6 +107,7 @@ public final class CoreSocketFactory {
108107
private static final int RSA_KEY_SIZE = 2048;
109108
private static final List<String> userAgents = new ArrayList<>();
110109
private static final String version = getVersion();
110+
private static final long MIN_REFRESH_DELAY_MS = 30000; // Minimum 30 seconds between refresh.
111111
private static CoreSocketFactory coreSocketFactory;
112112
private final ListenableFuture<KeyPair> localKeyPair;
113113
private final ConcurrentHashMap<String, CloudSqlInstance> instances = new ConcurrentHashMap<>();
@@ -392,6 +392,6 @@ private CloudSqlInstance apiFetcher(ConnectionConfig config) {
392392
instanceCredentialFactory,
393393
executor,
394394
localKeyPair,
395-
RateLimiter.create(1.0 / 30.0)); // 1 refresh attempt every 30 seconds
395+
MIN_REFRESH_DELAY_MS);
396396
}
397397
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.sql.core;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import com.google.common.util.concurrent.ListenableFuture;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
import org.jmock.lib.concurrent.DeterministicScheduler;
27+
import org.junit.Test;
28+
29+
public class AsyncRateLimiterTest {
30+
31+
@Test
32+
public void firstCallShouldReturnNoDelay() {
33+
RateLimiterTestHarness th = new RateLimiterTestHarness(100);
34+
ListenableFuture<?> f = th.rateLimiter.acquireAsync(th.ex);
35+
assertThat(f.isDone()).isTrue();
36+
}
37+
38+
@Test
39+
public void subsequentCallsShouldReturnDelays() {
40+
RateLimiterTestHarness th = new RateLimiterTestHarness(100);
41+
ListenableFuture<?> f1 = th.rateLimiter.acquireAsync(th.ex);
42+
ListenableFuture<?> f2 = th.rateLimiter.acquireAsync(th.ex);
43+
// When calls occur at the same timestamp, one will return 0 and
44+
// the other will return the full delay.
45+
assertThat(f1.isDone()).isTrue();
46+
assertThat(f2.isDone()).isFalse();
47+
}
48+
49+
@Test
50+
public void delayBeforeExpiration() throws InterruptedException {
51+
RateLimiterTestHarness th = new RateLimiterTestHarness(100);
52+
ListenableFuture<?> f1 = th.rateLimiter.acquireAsync(th.ex);
53+
assertThat(f1.isDone()).isTrue();
54+
55+
// Before the expiration, isDone should be false
56+
th.tickMs(99);
57+
ListenableFuture<?> f2 = th.rateLimiter.acquireAsync(th.ex);
58+
assertThat(f2.isDone()).isFalse();
59+
60+
// Exactly at the expiration, isDone should be true
61+
th.tickMs(1);
62+
assertThat(f2.isDone()).isTrue();
63+
}
64+
65+
@Test
66+
public void noDelayExactlyAtExpiration() throws InterruptedException {
67+
RateLimiterTestHarness th = new RateLimiterTestHarness(100);
68+
ListenableFuture<?> f1 = th.rateLimiter.acquireAsync(th.ex);
69+
ListenableFuture<?> f2 = th.rateLimiter.acquireAsync(th.ex);
70+
assertThat(f1.isDone()).isTrue();
71+
assertThat(f2.isDone()).isFalse();
72+
73+
th.tickMs(50);
74+
assertThat(f2.isDone()).isFalse();
75+
76+
th.tickMs(100);
77+
assertThat(f2.isDone()).isTrue();
78+
}
79+
80+
@Test
81+
public void noDelayAfterExpiration() throws InterruptedException {
82+
RateLimiterTestHarness th = new RateLimiterTestHarness(100);
83+
ListenableFuture<?> f1 = th.rateLimiter.acquireAsync(th.ex);
84+
assertThat(f1.isDone()).isTrue();
85+
86+
th.tickMs(101);
87+
ListenableFuture<?> f2 = th.rateLimiter.acquireAsync(th.ex);
88+
assertThat(f2.isDone()).isTrue();
89+
}
90+
91+
@Test
92+
public void testAsyncWorks() {
93+
final long delay = 100;
94+
95+
RateLimiterTestHarness th = new RateLimiterTestHarness(delay);
96+
97+
List<ListenableFuture<?>> futures = new ArrayList<>();
98+
for (int i = 0; i < 3; i++) {
99+
futures.add(th.rateLimiter.acquireAsync(th.ex));
100+
}
101+
102+
// First attempt happens without any delay, because nothing is run yet.
103+
assertThat(futures.stream().mapToInt(f -> f.isDone() ? 1 : 0).sum()).isEqualTo(1);
104+
105+
// Tick forward less than the delay.
106+
th.tickMs(50);
107+
108+
// When all futures are evaluated again immediately, still only 1 request has finished.
109+
// because not enough time has elapsed.
110+
assertThat(futures.stream().mapToInt(f -> f.isDone() ? 1 : 0).sum()).isEqualTo(1);
111+
112+
// Tick forward more than the delay. Now 2 attempts should have finished.
113+
th.tickMs(100);
114+
115+
assertThat(futures.stream().mapToInt(f -> f.isDone() ? 1 : 0).sum()).isEqualTo(2);
116+
117+
// Tick forward more than the delay. Now 3 attempts should have finished.
118+
th.tickMs(100);
119+
assertThat(futures.stream().mapToInt(f -> f.isDone() ? 1 : 0).sum()).isEqualTo(3);
120+
}
121+
122+
private static class RateLimiterTestHarness {
123+
124+
final AtomicLong now = new AtomicLong(System.currentTimeMillis());
125+
final DeterministicScheduler ex = new DeterministicScheduler();
126+
final AsyncRateLimiter rateLimiter;
127+
128+
RateLimiterTestHarness(long delay) {
129+
rateLimiter = new AsyncRateLimiter(delay, now::get);
130+
}
131+
132+
private void tickMs(long ms) {
133+
now.addAndGet(ms);
134+
ex.tick(ms, TimeUnit.MILLISECONDS);
135+
}
136+
}
137+
}

core/src/test/java/com/google/cloud/sql/core/CloudSqlInstanceConcurrencyTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.common.util.concurrent.Futures;
2626
import com.google.common.util.concurrent.ListenableFuture;
2727
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
28-
import com.google.common.util.concurrent.RateLimiter;
2928
import java.io.IOException;
3029
import java.security.KeyPair;
3130
import java.util.ArrayList;
@@ -56,6 +55,7 @@ public void initialize(HttpRequest var1) throws IOException {
5655

5756
@Test(timeout = 20000) // 45 seconds timeout in case of deadlock
5857
public void testForceRefreshDoesNotCauseADeadlockOrBrokenRefreshLoop() throws Exception {
58+
final long refreshDelayMs = 50;
5959
MockAdminApi mockAdminApi = new MockAdminApi();
6060
ListenableFuture<KeyPair> keyPairFuture =
6161
Futures.immediateFuture(mockAdminApi.getClientKeyPair());
@@ -74,7 +74,7 @@ public void testForceRefreshDoesNotCauseADeadlockOrBrokenRefreshLoop() throws Ex
7474
new TestCredentialFactory(),
7575
executor,
7676
keyPairFuture,
77-
newRateLimiter()));
77+
refreshDelayMs));
7878
}
7979

8080
// Get SSL Data for each instance, forcing the first refresh to complete.
@@ -130,8 +130,4 @@ private Thread startForceRefreshThread(CloudSqlInstance inst) {
130130
t.start();
131131
return t;
132132
}
133-
134-
private RateLimiter newRateLimiter() {
135-
return RateLimiter.create(20.0); // 20/sec = every 50 ms
136-
}
137133
}

0 commit comments

Comments
 (0)