【安卓开发系列 -- APP 开源框架】网络请求框架 OKHTTP -- 连接的建立与池化
【安卓开发系列 -- APP 开源框架】网络请求框架 OKHTTP -- 连接的建立与池化【1】建立连接在ConnectInterceptor拦截器中开始建立连接,建立连接的代码如下public final class ConnectInterceptor implements Interceptor {...@Override public Response intercept(Chain ch
·
【安卓开发系列 -- APP 开源框架】网络请求框架 OKHTTP -- 连接的建立与池化
【1】建立连接
在ConnectInterceptor拦截器中开始建立连接,建立连接的代码如下
public final class ConnectInterceptor implements Interceptor {
...
@Override public Response intercept(Chain chain) throws IOException {
...
// 创建输出流
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
// 获取已经建立的连接
RealConnection connection = streamAllocation.connection();
// 责任链中的任务向下传递
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
【1.1】StreamAllocation.newStream 方法分析
public final class StreamAllocation {
// StreamAllocation在RetryAndFollowUpInterceptor拦截器中创建
// 连接池
// ConnectionPool实例在OkHttpClient的内部类Builder的构造函数中创建
private final ConnectionPool connectionPool;
// StreamAllocation实例的HTTP编解码器
private HttpCodec codec;
...
// 1. StreamAllocation.newStream() 主要任务是创建HttpCodec;
// 2. 根据 OkHttpClient中的设置,即连接超时、读超时、写超时及连接失败是否重试,
// 调用findHealthyConnection()完成连接,即创建RealConnection实例;
// 3. 根据HTTP协议的版本创建Http1Codec或Http2Codec;
// HttpCodec编码HTTP请求并解码HTTP响应;
// HttpCodec提供操作,
// 发送请求相关,a) 写入请求头部;b) 创建请求体以用于发送请求体数据;c) 结束请求发送;
// 获得响应相关,d) 读取响应头部;e) 打开请求体以用于后续获取请求体数据;f) 取消请求执行;
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
// 完成连接,即RealConnection的创建
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
// 根据HTTP协议的版本创建Http1Codec或Http2Codec
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
// 将创建的HTTP编解码器赋值给当前StreamAllocation实例
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
// 创建RealConnection实例
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
// 如果这是一个全新的连接,可以跳过健康检查
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
// 执行(可能较慢)检查以确认连接池中的连接仍然良好,如果连接不再良好则将其从连接池里拿出来再重新开始
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
// Forbid new streams from being created on the connection that hosts this allocation
// 禁止在承载此分配的连接上创建新流
// 释放连接
noNewStreams();
continue;
}
return candidate;
}
}
// 1. 查找是否有完整的连接可用(条件,Socket没有关闭&输入流没有关闭&输出流没有关闭&Http2连接没有关闭)
// 2. 连接池中是否有可用的连接,如果有则可用;
// 3. 如果没有可用连接则自己创建一个;
// 4. 开始TCP连接以及TLS握手操作;
// (握手操作调用RealConnection.connect()-->最终调用ava里的套接字Socket里的connect()方法)
// 5. 将新创建的连接加入连接池;
//
// 返回一个连接来托管一个新的流,优先选择现有的连接(如果存在的话),然后是池,最后建立一个新的连接;
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
//
// 尝试使用已分配的连接,此处需要小心,因为已经分配的连接可能已经被限制在创建新的流中;
releasedConnection = this.connection;
/**
* releaseIfNoNewStreams方法
* Releases the currently held connection and returns a socket to close if the held connection
* restricts new streams from being created. With HTTP/2 multiple requests share the same
* connection so it's possible that our connection is restricted from creating new streams during
* a follow-up request.
*
* 释放当前保持的连接,并返回一个套接字,以便在保持的连接限制创建新流时关闭;
* 对于HTTP/2多个请求共享同一个连接,因此连接可能会受到限制,不能在后续请求期间创建新的流;
*/
toClose = releaseIfNoNewStreams();
// 1. 查看是否有完好的连接
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
// 2. 连接池中是否用可用的连接,有则使用
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
// 若已经发现了已分配的连接或已经从连接池中获取到了连接便已经完成了
return result;
}
// If we need a route selection, make one. This is a blocking operation.
// 若需要路由选择则创建一个,这是一个阻塞的操作
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
// 3. 如果没有可用连接则新建一个连接
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
// 现在存在一组IP地址,再次尝试从池中获取连接,这可能由于连接合并而匹配
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
// 若仍然没有找到适合的连接
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
// 创建一个连接并立即将其分配给该分配;这使得异步cancel()可以中断即将进行的握手;
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
// 若在第二轮中发现了连接池中的适合的连接则结束
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
// 4. 开始TCP以及TLS握手操作,这是阻塞操作
/**************************************************************************************************
* 正式开始建立连接
* 调用RealConnection.connect()方法建立连接
*************************************************************************************************/
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
// RouteDatabase创建到目标地址的新连接时要避免的失败路由的黑名单
// RouteDatabase内部维持一个LinkedHashSet
// 该操作将route从该LinkedHashSet中移除
routeDatabase().connected(result.route());
// 5. 将新创建的连接放在连接池中
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
// 如果同时创建了到同一地址的另一个多路复用连接则释放当前连接并获取那个连接
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
...
}
public final class RealConnection extends Http2Connection.Listener implements Connection {
...
private final ConnectionPool connectionPool;
private final Route route;
// 底层原始的Socket
private Socket rawSocket;
// 应用层Socket
private Socket socket;
private BufferedSource source;
private BufferedSink sink;
public RealConnection(ConnectionPool connectionPool, Route route) {
this.connectionPool = connectionPool;
this.route = route;
}
// connect-->connectSocket()进行socket连接-->Platform.get().connectSocket()
// -->socket.connect(address, connectTimeout);(此时进行了三次握手),握手完成后调用establishProtocol()
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if (protocol != null) throw new IllegalStateException("already connected");
// 路由选择
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
// 开始连接
while (true) {
try {
// 建立隧道连接
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
// rawSocket在connectSocket方法中被创建
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
// 无法连接隧道,但正确关闭了资源
break;
}
} else {
// 建立普通连接
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
// 建立协议
// 不管是建立隧道连接,还是建立普通连接,都必须建立协议
// 建立协议是在建立好TCP连接之后,并且在该TCP能被用于收发数据之前执行的
// 该步骤主要为数据的加密传输做一些初始化,比如TLS握手,HTTP/2的协议协商等
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
// 处理异常,用于释放资源
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
// 排除路由异常routeException
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
// http2Connection在establishProtocol方法中创建
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
/**
* Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
* proxy server can issue an auth challenge and then close the connection.
*
* 完成通过代理隧道建立HTTPS连接的所有工作,这里的问题是代理服务器可以发出一个验证质询,然后关闭连接;
*/
private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
EventListener eventListener) throws IOException {
// 1. 构造一个建立隧道连接的请求;
Request tunnelRequest = createTunnelRequest();
HttpUrl url = tunnelRequest.url();
for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
// 2. 与HTTP代理服务器建立TCP连接
connectSocket(connectTimeout, readTimeout, call, eventListener);
// 3. 创建隧道,此处主要是将建立隧道连接的请求发送给HTTP代理服务器并处理它的响应
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
if (tunnelRequest == null) break; // Tunnel successfully created.
// The proxy decided to close the connection after an auth challenge. We need to create a new
// connection, but this time with the auth credentials.
// 代理在验证质询后决定关闭连接
// 需要创建一个新的连接,但这次使用的是身份验证凭据
closeQuietly(rawSocket);
rawSocket = null;
sink = null;
source = null;
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
// 重复第2和第3步MAX_TUNNEL_ATTEMPTS次,直到建立好隧道连接
}
}
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
// 完成在原始套接字上构建完整的HTTP或HTTPS连接所需的所有工作
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
// 根据代理类型的不同处理Socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
// 建立Socket连接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
// 异常处理抛出连接异常
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// 下面的try/catch块是一种伪黑客方法,用来避免android7.0上的崩溃
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
// 初始化Okio的source/sink
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
/**
* To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create
* the proxy connection. This may need to be retried if the proxy requires authorization.
*
* 要通过HTTP代理建立HTTPS连接,请发送未加密的CONNECT请求以创建代理连接;
* 如果代理需要授权则可能需要重试;
*/
private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
HttpUrl url) throws IOException {
// Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
// 在每个SSL + 代理连接的第一个消息对上创建一个SSL隧道
String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
while (true) {
// 发送请求
Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink);
source.timeout().timeout(readTimeout, MILLISECONDS);
sink.timeout().timeout(writeTimeout, MILLISECONDS);
tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine);
tunnelConnection.finishRequest();
// 处理响应
Response response = tunnelConnection.readResponseHeaders(false)
.request(tunnelRequest)
.build();
// The response body from a CONNECT should be empty, but if it is not then we should consume
// it before proceeding.
// 来自CONNECT的响应体应该是空的,但是如果不是,那么应该在继续之前消费该响应
long contentLength = HttpHeaders.contentLength(response);
if (contentLength == -1L) {
contentLength = 0L;
}
Source body = tunnelConnection.newFixedLengthSource(contentLength);
Util.skipAll(body, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
body.close();
switch (response.code()) {
case HTTP_OK:
// Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
// that happens, then we will have buffered bytes that are needed by the SSLSocket!
// This check is imperfect: it doesn't tell us whether a handshake will succeed, just
// that it will almost certainly fail because the proxy has sent unexpected data.
//
// 假设服务器在发送TLS ClientHello之前不会发送TLS ServerHello;
// 如果该情况发生,则可获取拥有SSLSocket所需的缓冲字节
// 这个检查是不完美的,它无法确定握手是否成功,但是它几乎可以确定是否失败,因为代理发送了意外的数据
if (!source.buffer().exhausted() || !sink.buffer().exhausted()) {
throw new IOException("TLS tunnel buffered too many bytes!");
}
return null;
// HTTP响应码 407,Proxy Authentication Required,请求要求代理的身份认证,请求者应当使用代理进行授权
case HTTP_PROXY_AUTH:
// 重新构建请求
tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy");
if ("close".equalsIgnoreCase(response.header("Connection"))) {
return tunnelRequest;
}
break;
default:
throw new IOException(
"Unexpected response code for CONNECT: " + response.code());
}
}
}
...
}
【2】连接池
【2.1】创建连接池
public final class ConnectionPool {
...
private final int maxIdleConnections;
private final long keepAliveDurationNs;
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
}
}
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
...
public static final class Builder {
...
connectionPool = new ConnectionPool();
...
}
}
【2.2】存储连接
public final class ConnectionPool {
...
// 用于存放连接实例的双端队列
private final Deque<RealConnection> connections = new ArrayDeque<>();
/**********************************************************************
* 存储连接
*********************************************************************/
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
// 将cleanup任务提交到线程池中
//
// 没有任何连接时,cleanupRunning=false;
// 即没有任何链接时才会去执行executor.execute(cleanupRunnable);
// 从而保证每个连接池最多只能运行一个线程
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
// 将连接实例添加入连接存储的双端队列
connections.add(connection);
}
}
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
...
static {
Internal.instance = new Internal() {
// put方法在ConnectInterceptor-->intercept-->
// streamAllocation.newStream-->
// findHealthyConnection-->创建新链接后调用
@Override public void put(ConnectionPool pool, RealConnection connection) {
pool.put(connection);
}
};
}
...
}
【2.3】清理连接
public final class ConnectionPool {
...
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
* 后台线程用于清理过期的连接,每个连接池最多只能运行一个线程,线程池执行器允许池本身被垃圾收集;
* 连接池中维护类一个线程池,该线程池中只开启了一个线程用来清理链接
*/
// corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,使用SynchronousQueue直接提交队列
// 在执行execute会立即交给复用的线程或新创建线程执行任务
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
// 清理连接,在线程池executor里调用
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
// 执行清理并返回下次需要清理的时间
// 没有任何连接时,cleanupRunning=false;
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
// 在timeout时间内释放锁
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
* 对该池执行维护,如果它超出保持活动限制或空闲连接限制,则移除空闲时间最长的连接;
*
* Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
* -1 if no further cleanups are required.
* 返回纳秒级的持续时间,直到下次调用此方法的周期为止,如果不需要进一步清理则返回-1;
*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
// 查找需要移除的连接或下一次回收的周期
synchronized (this) {
// 遍历所有的连接,标记不活跃的连接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 1. 查询此连接内部的StreanAllocation的引用数量
// 即确认连接是否可用
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
// 2. 标记空闲连接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
// 3. 如果空闲连接超过5个或者keepalive时间大于5分钟则清理该连接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
// 4. 返回此连接的到期时间,指示触发下次清理调用
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
// 5. 全部都是活跃连接,则5分钟后再进行清理
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
// 无连接空闲或正在使用
cleanupRunning = false;
return -1;
}
}
// 关闭空闲连接的Socket
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
// 返回0则立即进行清理
return 0;
}
/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on
* {@code connection}. Allocations are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*
* 删除所有泄漏的分配然后返回连接上剩余的活动分配数
* 如果连接正在跟踪分配,但应用程序代码已放弃分配,则会泄漏这些分配
* 泄漏检测不精确并且依赖于垃圾收集
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
// public final class RealConnection extends Http2Connection.Listener implements Connection {
// /** Current streams carried by this connection. */
// // 此连接携带的当前流
// public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
// }
//
// 获取虚引用列表
List<Reference<StreamAllocation>> references = connection.allocations;
// 遍历虚引用列表
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
// 如果虚引用StreamAllocation正在被使用则跳过并进行下一次循环
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
// 发现了泄漏的分配
// 记录日志信息
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
// 移除该StreamAllocation引
references.remove(i);
// 标记该连接没有分配的流
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
// 如果这是最后一次分配则连接可以立即收回
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
...
}
【2.4】获取连接
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
...
static {
Internal.instance = new Internal() {
// get方法在ConnectInterceptor-->intercept-->
// streamAllocation.newStream-->
// findHealthyConnection-->findConnection方法中调用,以尝试在连接池中获取连接
@Override public RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
};
}
...
}
public final class ConnectionPool {
...
// 用于存放连接实例的双端队列
private final Deque<RealConnection> connections = new ArrayDeque<>();
/**********************************************************************
* 获取连接
*********************************************************************/
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
// 遍历存放连接实例的双端队列
for (RealConnection connection : connections) {
// isEligible方法
// Returns true if this connection can carry a stream allocation to {@code address}
// 若当前连接可以携带到地址address的流分配则返回true
if (connection.isEligible(address, route)) {
// 使用streamAllocation保存该连接
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
}
【2.5】复用连接
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
...
static {
Internal.instance = new Internal() {
// deduplicate方法在ConnectInterceptor-->intercept-->
// streamAllocation.newStream-->
// findHealthyConnection-->findConnection方法中调用,用于连接的多路复用处理
@Override public Socket deduplicate(
ConnectionPool pool, Address address, StreamAllocation streamAllocation) {
return pool.deduplicate(address, streamAllocation);
}
};
}
...
}
public final class ConnectionPool {
...
// 用于存放连接实例的双端队列
private final Deque<RealConnection> connections = new ArrayDeque<>();
/**********************************************************************
* 复用连接
*********************************************************************/
/**
* Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
* This recovers when multiple multiplexed connections are created concurrently.
*
* 如果可能,将streamAllocation保留的连接替换为共享连接
* 当同时创建多个多路复用连接时将恢复
*/
@Nullable Socket deduplicate(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
// 遍历连接池
for (RealConnection connection : connections) {
// isEligible方法
// Returns true if this connection can carry a stream allocation to {@code address}
// 若当前连接可以携带到地址address的流分配则返回true
if (connection.isEligible(address, null)
/**
* isMultiplexed方法
* Returns true if this is an HTTP/2 connection.
* Such connections can be used in multiple HTTP requests simultaneously.
*
* 如果这是HTTP/2连接,则返回true
* HTTP/2连接可以同时用于多个HTTP请求
*/
&& connection.isMultiplexed()
// 连接池中取出的连接不是streamAllocation中维持的连接
&& connection != streamAllocation.connection()) {
// releaseAndAcquire方法
// Release the connection held by this connection and acquire newConnection instead
// 释放此连接实例所保持的连接并获取新连接
return streamAllocation.releaseAndAcquire(connection);
}
}
return null;
}
}
参考致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)