Skip to content

Commit 851c8df

Browse files
committed
Move plain socket create and connect operations to HttpClientConnectionOperator
1 parent f4f5f73 commit 851c8df

File tree

6 files changed

+210
-240
lines changed

6 files changed

+210
-240
lines changed

httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java

Lines changed: 58 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,21 @@
4242
import org.apache.hc.client5.http.UnsupportedSchemeException;
4343
import org.apache.hc.client5.http.impl.ConnPoolSupport;
4444
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
45+
import org.apache.hc.client5.http.io.DetachedSocketFactory;
4546
import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
4647
import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
47-
import org.apache.hc.client5.http.protocol.HttpClientContext;
4848
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
4949
import org.apache.hc.client5.http.socket.LayeredConnectionSocketFactory;
5050
import org.apache.hc.core5.annotation.Contract;
5151
import org.apache.hc.core5.annotation.Internal;
5252
import org.apache.hc.core5.annotation.ThreadingBehavior;
5353
import org.apache.hc.core5.http.ConnectionClosedException;
5454
import org.apache.hc.core5.http.HttpHost;
55+
import org.apache.hc.core5.http.URIScheme;
5556
import org.apache.hc.core5.http.config.Lookup;
5657
import org.apache.hc.core5.http.io.SocketConfig;
5758
import org.apache.hc.core5.http.protocol.HttpContext;
59+
import org.apache.hc.core5.io.Closer;
5860
import org.apache.hc.core5.util.Args;
5961
import org.apache.hc.core5.util.TimeValue;
6062
import org.apache.hc.core5.util.Timeout;
@@ -72,35 +74,41 @@
7274
@Contract(threading = ThreadingBehavior.STATELESS)
7375
public class DefaultHttpClientConnectionOperator implements HttpClientConnectionOperator {
7476

75-
static final String SOCKET_FACTORY_REGISTRY = "http.socket-factory-registry";
76-
7777
private static final Logger LOG = LoggerFactory.getLogger(DefaultHttpClientConnectionOperator.class);
7878

79+
static final DetachedSocketFactory PLAIN_SOCKET_FACTORY = new DetachedSocketFactory() {
80+
81+
@Override
82+
public Socket create(final Proxy socksProxy) throws IOException {
83+
return socksProxy == null ? new Socket() : new Socket(socksProxy);
84+
}
85+
86+
};
87+
88+
private final DetachedSocketFactory detachedSocketFactory;
7989
private final Lookup<ConnectionSocketFactory> socketFactoryRegistry;
8090
private final SchemePortResolver schemePortResolver;
8191
private final DnsResolver dnsResolver;
8292

8393
public DefaultHttpClientConnectionOperator(
94+
final DetachedSocketFactory detachedSocketFactory,
8495
final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
8596
final SchemePortResolver schemePortResolver,
8697
final DnsResolver dnsResolver) {
8798
super();
88-
Args.notNull(socketFactoryRegistry, "Socket factory registry");
89-
this.socketFactoryRegistry = socketFactoryRegistry;
99+
this.detachedSocketFactory = Args.notNull(detachedSocketFactory, "Plain socket factory");
100+
this.socketFactoryRegistry = Args.notNull(socketFactoryRegistry, "Socket factory registry");
90101
this.schemePortResolver = schemePortResolver != null ? schemePortResolver :
91102
DefaultSchemePortResolver.INSTANCE;
92103
this.dnsResolver = dnsResolver != null ? dnsResolver :
93104
SystemDefaultDnsResolver.INSTANCE;
94105
}
95106

96-
@SuppressWarnings("unchecked")
97-
private Lookup<ConnectionSocketFactory> getSocketFactoryRegistry(final HttpContext context) {
98-
Lookup<ConnectionSocketFactory> reg = (Lookup<ConnectionSocketFactory>) context.getAttribute(
99-
SOCKET_FACTORY_REGISTRY);
100-
if (reg == null) {
101-
reg = this.socketFactoryRegistry;
102-
}
103-
return reg;
107+
public DefaultHttpClientConnectionOperator(
108+
final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
109+
final SchemePortResolver schemePortResolver,
110+
final DnsResolver dnsResolver) {
111+
this(PLAIN_SOCKET_FACTORY, socketFactoryRegistry, schemePortResolver, dnsResolver);
104112
}
105113

106114
@Override
@@ -128,11 +136,6 @@ public void connect(
128136
Args.notNull(host, "Host");
129137
Args.notNull(socketConfig, "Socket config");
130138
Args.notNull(context, "Context");
131-
final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context);
132-
final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());
133-
if (sf == null) {
134-
throw new UnsupportedSchemeException(host.getSchemeName() + " protocol is not supported");
135-
}
136139
final InetAddress[] remoteAddresses;
137140
if (host.getAddress() != null) {
138141
remoteAddresses = new InetAddress[] { host.getAddress() };
@@ -154,47 +157,59 @@ public void connect(
154157

155158
final Timeout soTimeout = socketConfig.getSoTimeout();
156159
final SocketAddress socksProxyAddress = socketConfig.getSocksProxyAddress();
157-
final Proxy proxy = socksProxyAddress != null ? new Proxy(Proxy.Type.SOCKS, socksProxyAddress) : null;
160+
final Proxy socksProxy = socksProxyAddress != null ? new Proxy(Proxy.Type.SOCKS, socksProxyAddress) : null;
158161
final int port = this.schemePortResolver.resolve(host);
159162
for (int i = 0; i < remoteAddresses.length; i++) {
160163
final InetAddress address = remoteAddresses[i];
161164
final boolean last = i == remoteAddresses.length - 1;
162-
163-
Socket sock = sf.createSocket(proxy, context);
164-
if (soTimeout != null) {
165-
sock.setSoTimeout(soTimeout.toMillisecondsIntBound());
166-
}
167-
sock.setReuseAddress(socketConfig.isSoReuseAddress());
168-
sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
169-
sock.setKeepAlive(socketConfig.isSoKeepAlive());
170-
if (socketConfig.getRcvBufSize() > 0) {
171-
sock.setReceiveBufferSize(socketConfig.getRcvBufSize());
172-
}
173-
if (socketConfig.getSndBufSize() > 0) {
174-
sock.setSendBufferSize(socketConfig.getSndBufSize());
175-
}
176-
177-
final int linger = socketConfig.getSoLinger().toMillisecondsIntBound();
178-
if (linger >= 0) {
179-
sock.setSoLinger(true, linger);
180-
}
181-
conn.bind(sock);
182-
183165
final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);
184166
if (LOG.isDebugEnabled()) {
185167
LOG.debug("{}:{} connecting {}->{} ({})",
186168
host.getHostName(), host.getPort(), localAddress, remoteAddress, connectTimeout);
187169
}
170+
final Socket socket = detachedSocketFactory.create(socksProxy);
188171
try {
189-
sock = sf.connectSocket(sock, host, remoteAddress, localAddress, connectTimeout, attachment, context);
190-
conn.bind(sock);
172+
conn.bind(socket);
173+
if (soTimeout != null) {
174+
socket.setSoTimeout(soTimeout.toMillisecondsIntBound());
175+
}
176+
socket.setReuseAddress(socketConfig.isSoReuseAddress());
177+
socket.setTcpNoDelay(socketConfig.isTcpNoDelay());
178+
socket.setKeepAlive(socketConfig.isSoKeepAlive());
179+
if (socketConfig.getRcvBufSize() > 0) {
180+
socket.setReceiveBufferSize(socketConfig.getRcvBufSize());
181+
}
182+
if (socketConfig.getSndBufSize() > 0) {
183+
socket.setSendBufferSize(socketConfig.getSndBufSize());
184+
}
185+
186+
final int linger = socketConfig.getSoLinger().toMillisecondsIntBound();
187+
if (linger >= 0) {
188+
socket.setSoLinger(true, linger);
189+
}
190+
191+
if (localAddress != null) {
192+
socket.bind(localAddress);
193+
}
194+
socket.connect(remoteAddress, TimeValue.isPositive(connectTimeout) ? connectTimeout.toMillisecondsIntBound() : 0);
195+
conn.bind(socket);
191196
conn.setSocketTimeout(soTimeout);
192197
if (LOG.isDebugEnabled()) {
193198
LOG.debug("{}:{} connected {}->{} as {}",
194199
host.getHostName(), host.getPort(), localAddress, remoteAddress, ConnPoolSupport.getId(conn));
195200
}
201+
final ConnectionSocketFactory connectionSocketFactory = socketFactoryRegistry != null ? socketFactoryRegistry.lookup(host.getSchemeName()) : null;
202+
if (connectionSocketFactory instanceof LayeredConnectionSocketFactory && URIScheme.HTTPS.same(host.getSchemeName())) {
203+
final LayeredConnectionSocketFactory lsf = (LayeredConnectionSocketFactory) connectionSocketFactory;
204+
final Socket upgradedSocket = lsf.createLayeredSocket(socket, host.getHostName(), port, attachment, context);
205+
conn.bind(upgradedSocket);
206+
}
196207
return;
208+
} catch (final RuntimeException ex) {
209+
Closer.closeQuietly(socket);
210+
throw ex;
197211
} catch (final IOException ex) {
212+
Closer.closeQuietly(socket);
198213
if (last) {
199214
if (LOG.isDebugEnabled()) {
200215
LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
@@ -225,9 +240,7 @@ public void upgrade(
225240
final HttpHost host,
226241
final Object attachment,
227242
final HttpContext context) throws IOException {
228-
final HttpClientContext clientContext = HttpClientContext.adapt(context);
229-
final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(clientContext);
230-
final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());
243+
final ConnectionSocketFactory sf = socketFactoryRegistry.lookup(host.getSchemeName());
231244
if (sf == null) {
232245
throw new UnsupportedSchemeException(host.getSchemeName() +
233246
" protocol is not supported");

httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ public Future<ManagedAsyncClientConnection> connect(
9595
final ComplexFuture<ManagedAsyncClientConnection> future = new ComplexFuture<>(callback);
9696
final HttpHost remoteEndpoint = RoutingSupport.normalize(host, schemePortResolver);
9797
final InetAddress remoteAddress = host.getAddress();
98-
final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null;
9998
final TlsConfig tlsConfig = attachment instanceof TlsConfig ? (TlsConfig) attachment : TlsConfig.DEFAULT;
10099
final Future<IOSession> sessionFuture = sessionRequester.connect(
101100
connectionInitiator,
@@ -109,6 +108,7 @@ public Future<ManagedAsyncClientConnection> connect(
109108
@Override
110109
public void completed(final IOSession session) {
111110
final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
111+
final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null;
112112
if (tlsStrategy != null && URIScheme.HTTPS.same(host.getSchemeName())) {
113113
try {
114114
final Timeout socketTimeout = connection.getSocketTimeout();
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
28+
package org.apache.hc.client5.http.io;
29+
30+
import java.io.IOException;
31+
import java.net.Proxy;
32+
import java.net.Socket;
33+
34+
import org.apache.hc.core5.annotation.Contract;
35+
import org.apache.hc.core5.annotation.Internal;
36+
import org.apache.hc.core5.annotation.ThreadingBehavior;
37+
38+
/**
39+
* @since 5.4
40+
*/
41+
@Internal
42+
@Contract(threading = ThreadingBehavior.STATELESS)
43+
public interface DetachedSocketFactory {
44+
45+
Socket create(Proxy proxy) throws IOException;
46+
47+
}

httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestBasicHttpClientConnectionManager.java

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hc.client5.http.config.ConnectionConfig;
3939
import org.apache.hc.client5.http.config.TlsConfig;
4040
import org.apache.hc.client5.http.io.ConnectionEndpoint;
41+
import org.apache.hc.client5.http.io.DetachedSocketFactory;
4142
import org.apache.hc.client5.http.io.LeaseRequest;
4243
import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
4344
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -57,7 +58,6 @@
5758
import org.mockito.Mockito;
5859
import org.mockito.MockitoAnnotations;
5960

60-
@SuppressWarnings({"boxing","static-access"}) // test code
6161
public class TestBasicHttpClientConnectionManager {
6262

6363
@Mock
@@ -67,7 +67,7 @@ public class TestBasicHttpClientConnectionManager {
6767
@Mock
6868
private Lookup<ConnectionSocketFactory> socketFactoryRegistry;
6969
@Mock
70-
private ConnectionSocketFactory plainSocketFactory;
70+
private DetachedSocketFactory detachedSocketFactory;
7171
@Mock
7272
private LayeredConnectionSocketFactory sslSocketFactory;
7373
@Mock
@@ -82,8 +82,9 @@ public class TestBasicHttpClientConnectionManager {
8282
@BeforeEach
8383
public void setup() throws Exception {
8484
MockitoAnnotations.openMocks(this);
85-
mgr = new BasicHttpClientConnectionManager(
86-
socketFactoryRegistry, connFactory, schemePortResolver, dnsResolver);
85+
mgr = new BasicHttpClientConnectionManager(new DefaultHttpClientConnectionOperator(
86+
detachedSocketFactory, socketFactoryRegistry, schemePortResolver, dnsResolver),
87+
connFactory);
8788
}
8889

8990
@Test
@@ -382,44 +383,31 @@ public void testTargetConnect() throws Exception {
382383

383384
Mockito.when(dnsResolver.resolve("somehost")).thenReturn(new InetAddress[] {remote});
384385
Mockito.when(schemePortResolver.resolve(target)).thenReturn(8443);
385-
Mockito.when(socketFactoryRegistry.lookup("https")).thenReturn(plainSocketFactory);
386-
Mockito.when(plainSocketFactory.createSocket(Mockito.any(), Mockito.any())).thenReturn(socket);
387-
Mockito.when(plainSocketFactory.connectSocket(
388-
Mockito.eq(socket),
389-
Mockito.any(),
390-
Mockito.any(),
391-
Mockito.any(),
392-
Mockito.any(),
386+
Mockito.when(detachedSocketFactory.create(Mockito.any())).thenReturn(socket);
387+
388+
Mockito.when(socketFactoryRegistry.lookup("https")).thenReturn(sslSocketFactory);
389+
Mockito.when(sslSocketFactory.createLayeredSocket(
390+
Mockito.same(socket),
391+
Mockito.eq("somehost"),
392+
Mockito.eq(8443),
393393
Mockito.any(),
394394
Mockito.any())).thenReturn(socket);
395395

396396
mgr.connect(endpoint1, null, context);
397397

398398
Mockito.verify(dnsResolver, Mockito.times(1)).resolve("somehost");
399399
Mockito.verify(schemePortResolver, Mockito.times(1)).resolve(target);
400-
Mockito.verify(plainSocketFactory, Mockito.times(1)).createSocket(null, context);
401-
Mockito.verify(plainSocketFactory, Mockito.times(1)).connectSocket(
402-
socket,
403-
target,
404-
new InetSocketAddress(remote, 8443),
405-
new InetSocketAddress(local, 0),
406-
Timeout.ofMilliseconds(234),
407-
tlsConfig,
408-
context);
400+
Mockito.verify(detachedSocketFactory, Mockito.times(1)).create(null);
401+
Mockito.verify(socket, Mockito.times(1)).connect(new InetSocketAddress(remote, 8443), 234);
402+
Mockito.verify(sslSocketFactory).createLayeredSocket(socket, "somehost", 8443, tlsConfig, context);
409403

410404
mgr.connect(endpoint1, TimeValue.ofMilliseconds(123), context);
411405

412406
Mockito.verify(dnsResolver, Mockito.times(2)).resolve("somehost");
413407
Mockito.verify(schemePortResolver, Mockito.times(2)).resolve(target);
414-
Mockito.verify(plainSocketFactory, Mockito.times(2)).createSocket(null, context);
415-
Mockito.verify(plainSocketFactory, Mockito.times(1)).connectSocket(
416-
socket,
417-
target,
418-
new InetSocketAddress(remote, 8443),
419-
new InetSocketAddress(local, 0),
420-
Timeout.ofMilliseconds(123),
421-
tlsConfig,
422-
context);
408+
Mockito.verify(detachedSocketFactory, Mockito.times(2)).create(null);
409+
Mockito.verify(socket, Mockito.times(1)).connect(new InetSocketAddress(remote, 8443), 123);
410+
Mockito.verify(sslSocketFactory, Mockito.times(2)).createLayeredSocket(socket, "somehost", 8443, tlsConfig, context);
423411
}
424412

425413
@Test
@@ -453,31 +441,15 @@ public void testProxyConnectAndUpgrade() throws Exception {
453441
Mockito.when(dnsResolver.resolve("someproxy")).thenReturn(new InetAddress[] {remote});
454442
Mockito.when(schemePortResolver.resolve(proxy)).thenReturn(8080);
455443
Mockito.when(schemePortResolver.resolve(target)).thenReturn(8443);
456-
Mockito.when(socketFactoryRegistry.lookup("http")).thenReturn(plainSocketFactory);
457444
Mockito.when(socketFactoryRegistry.lookup("https")).thenReturn(sslSocketFactory);
458-
Mockito.when(plainSocketFactory.createSocket(Mockito.any(), Mockito.any())).thenReturn(socket);
459-
Mockito.when(plainSocketFactory.connectSocket(
460-
Mockito.eq(socket),
461-
Mockito.any(),
462-
Mockito.any(),
463-
Mockito.any(),
464-
Mockito.any(),
465-
Mockito.any(),
466-
Mockito.any())).thenReturn(socket);
445+
Mockito.when(detachedSocketFactory.create(Mockito.any())).thenReturn(socket);
467446

468447
mgr.connect(endpoint1, null, context);
469448

470449
Mockito.verify(dnsResolver, Mockito.times(1)).resolve("someproxy");
471450
Mockito.verify(schemePortResolver, Mockito.times(1)).resolve(proxy);
472-
Mockito.verify(plainSocketFactory, Mockito.times(1)).createSocket(null, context);
473-
Mockito.verify(plainSocketFactory, Mockito.times(1)).connectSocket(
474-
socket,
475-
proxy,
476-
new InetSocketAddress(remote, 8080),
477-
new InetSocketAddress(local, 0),
478-
Timeout.ofMilliseconds(234),
479-
tlsConfig,
480-
context);
451+
Mockito.verify(detachedSocketFactory, Mockito.times(1)).create(null);
452+
Mockito.verify(socket, Mockito.times(1)).connect(new InetSocketAddress(remote, 8080), 234);
481453

482454
Mockito.when(conn.getSocket()).thenReturn(socket);
483455

0 commit comments

Comments
 (0)