11
11
import fixture .gcs .FakeOAuth2HttpHandler ;
12
12
import fixture .gcs .GoogleCloudStorageHttpHandler ;
13
13
14
+ import com .google .api .client .http .HttpExecuteInterceptor ;
15
+ import com .google .api .client .http .HttpRequestInitializer ;
14
16
import com .google .api .gax .retrying .RetrySettings ;
17
+ import com .google .cloud .ServiceOptions ;
15
18
import com .google .cloud .http .HttpTransportOptions ;
16
19
import com .google .cloud .storage .StorageException ;
17
20
import com .google .cloud .storage .StorageOptions ;
18
- import com .google .cloud .storage .StorageRetryStrategy ;
19
21
import com .sun .net .httpserver .HttpHandler ;
20
22
21
23
import org .apache .http .HttpStatus ;
61
63
import java .util .Objects ;
62
64
import java .util .Optional ;
63
65
import java .util .Queue ;
66
+ import java .util .concurrent .ConcurrentHashMap ;
64
67
import java .util .concurrent .ConcurrentLinkedQueue ;
65
68
import java .util .concurrent .atomic .AtomicBoolean ;
66
69
import java .util .concurrent .atomic .AtomicInteger ;
89
92
@ SuppressForbidden (reason = "use a http server" )
90
93
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
91
94
95
+ private final Map <String , AtomicInteger > requestCounters = new ConcurrentHashMap <>();
96
+ private String endpointUrlOverride ;
97
+
92
98
private String httpServerUrl () {
93
99
assertThat (httpServer , notNullValue ());
94
100
InetSocketAddress address = httpServer .getAddress ();
95
101
return "http://" + InetAddresses .toUriString (address .getAddress ()) + ":" + address .getPort ();
96
102
}
97
103
104
+ private String getEndpointUrl () {
105
+ return endpointUrlOverride != null ? endpointUrlOverride : httpServerUrl ();
106
+ }
107
+
98
108
@ Override
99
109
protected String downloadStorageEndpoint (BlobContainer container , String blob ) {
100
110
return "/download/storage/v1/b/bucket/o/" + container .path ().buildAsString () + blob ;
@@ -120,7 +130,7 @@ protected BlobContainer createBlobContainer(
120
130
) {
121
131
final Settings .Builder clientSettings = Settings .builder ();
122
132
final String client = randomAlphaOfLength (5 ).toLowerCase (Locale .ROOT );
123
- clientSettings .put (ENDPOINT_SETTING .getConcreteSettingForNamespace (client ).getKey (), httpServerUrl ());
133
+ clientSettings .put (ENDPOINT_SETTING .getConcreteSettingForNamespace (client ).getKey (), getEndpointUrl ());
124
134
clientSettings .put (TOKEN_URI_SETTING .getConcreteSettingForNamespace (client ).getKey (), httpServerUrl () + "/token" );
125
135
if (readTimeout != null ) {
126
136
clientSettings .put (READ_TIMEOUT_SETTING .getConcreteSettingForNamespace (client ).getKey (), readTimeout );
@@ -136,8 +146,33 @@ StorageOptions createStorageOptions(
136
146
final GoogleCloudStorageClientSettings gcsClientSettings ,
137
147
final HttpTransportOptions httpTransportOptions
138
148
) {
139
- StorageOptions options = super .createStorageOptions (gcsClientSettings , httpTransportOptions );
140
- RetrySettings .Builder retrySettingsBuilder = RetrySettings .newBuilder ()
149
+ final HttpTransportOptions requestCountingHttpTransportOptions = new HttpTransportOptions (
150
+ HttpTransportOptions .newBuilder ()
151
+ .setConnectTimeout (httpTransportOptions .getConnectTimeout ())
152
+ .setHttpTransportFactory (httpTransportOptions .getHttpTransportFactory ())
153
+ .setReadTimeout (httpTransportOptions .getReadTimeout ())
154
+ ) {
155
+ @ Override
156
+ public HttpRequestInitializer getHttpRequestInitializer (ServiceOptions <?, ?> serviceOptions ) {
157
+ // Add initializer/interceptor without interfering with any pre-existing ones
158
+ HttpRequestInitializer httpRequestInitializer = super .getHttpRequestInitializer (serviceOptions );
159
+ return request -> {
160
+ if (httpRequestInitializer != null ) {
161
+ httpRequestInitializer .initialize (request );
162
+ }
163
+ HttpExecuteInterceptor interceptor = request .getInterceptor ();
164
+ request .setInterceptor (req -> {
165
+ if (interceptor != null ) {
166
+ interceptor .intercept (req );
167
+ }
168
+ requestCounters .computeIfAbsent (request .getUrl ().getRawPath (), (url ) -> new AtomicInteger ())
169
+ .incrementAndGet ();
170
+ });
171
+ };
172
+ }
173
+ };
174
+ final StorageOptions options = super .createStorageOptions (gcsClientSettings , requestCountingHttpTransportOptions );
175
+ final RetrySettings .Builder retrySettingsBuilder = RetrySettings .newBuilder ()
141
176
.setTotalTimeout (options .getRetrySettings ().getTotalTimeout ())
142
177
.setInitialRetryDelay (Duration .ofMillis (10L ))
143
178
.setRetryDelayMultiplier (1.0d )
@@ -150,7 +185,7 @@ StorageOptions createStorageOptions(
150
185
retrySettingsBuilder .setMaxAttempts (maxRetries + 1 );
151
186
}
152
187
return options .toBuilder ()
153
- .setStorageRetryStrategy (StorageRetryStrategy . getLegacyStorageRetryStrategy ())
188
+ .setStorageRetryStrategy (getRetryStrategy ())
154
189
.setHost (options .getHost ())
155
190
.setCredentials (options .getCredentials ())
156
191
.setRetrySettings (retrySettingsBuilder .build ())
@@ -173,6 +208,25 @@ StorageOptions createStorageOptions(
173
208
return new GoogleCloudStorageBlobContainer (randomBoolean () ? BlobPath .EMPTY : BlobPath .EMPTY .add ("foo" ), blobStore );
174
209
}
175
210
211
+ public void testShouldRetryOnConnectionRefused () {
212
+ // port 1 should never be open
213
+ endpointUrlOverride = "http://127.0.0.1:1" ;
214
+ executeListBlobsAndAssertRetries ();
215
+ }
216
+
217
+ public void testShouldRetryOnUnresolvableHost () {
218
+ // https://www.rfc-editor.org/rfc/rfc2606.html#page-2
219
+ endpointUrlOverride = "http://unresolvable.invalid" ;
220
+ executeListBlobsAndAssertRetries ();
221
+ }
222
+
223
+ private void executeListBlobsAndAssertRetries () {
224
+ final int maxRetries = randomIntBetween (3 , 5 );
225
+ final BlobContainer blobContainer = createBlobContainer (maxRetries , null , null , null , null );
226
+ expectThrows (StorageException .class , () -> blobContainer .listBlobs (randomPurpose ()));
227
+ assertEquals (maxRetries + 1 , requestCounters .get ("/storage/v1/b/bucket/o" ).get ());
228
+ }
229
+
176
230
public void testReadLargeBlobWithRetries () throws Exception {
177
231
final int maxRetries = randomIntBetween (2 , 10 );
178
232
final AtomicInteger countDown = new AtomicInteger (maxRetries );
0 commit comments