前言
本文是对
OkHttp
开源库的一个详细解析,如果你觉得自己不够了解
OkHttp
,想进一步学习一下,相信本文对你会有所帮助。
本文包含了详细的请求流程分析、各大拦截器解读以及自己的一点反思总结,文章很长,欢迎大家一起交流讨论。
使用方法
使用方法十分简单,分别创建一个
OkHttpClient
对象,一个
Request
对象,然后利用他们创建一个
Call
对象,最后调用同步请求
execute()
方法或者异步请求
enqueue()
方法来拿到
Response
。
private final OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://github.com/")
.build();
//同步请求
Response response = client.newCall(request).execute();
//todo handle response
//异步请求
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
//todo handle request failed
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
//todo handle Response
}
});
基本对象介绍
正如使用方法中所述,我们先后构建了
OkHttpClient
对象、
Request
对象、
Call
对象,那这些对象都是什么意思,有什么作用呢?这个就需要我们进一步学习了解了。
OkHttpClient
一个请求的配置类,采用了
建造者模式
,方便用户配置一些请求参数,如配置
callTimeout
,
cookie
,
interceptor
等等。
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
constructor() : this(Builder())
class Builder constructor() {
//调度器
internal var dispatcher: Dispatcher = Dispatcher()
//连接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//整体流程拦截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络流程拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//流程监听器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
//连接失败时是否重连
internal var retryOnConnectionFailure = true
//服务器认证设置
internal var authenticator: Authenticator = Authenticator.NONE
//是否重定向
internal var followRedirects = true
//是否从HTTP重定向到HTTPS
internal var followSslRedirects = true
//cookie设置
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
//缓存设置
internal var cache: Cache? = null
//DNS设置
internal var dns: Dns = Dns.SYSTEM
//代理设置
internal var proxy: Proxy? = null
//代理选择器设置
internal var proxySelector: ProxySelector? = null
//代理服务器认证设置
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
//socket配置
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
//https socket配置
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
//协议
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
//域名校验
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
//请求超时
internal var callTimeout = 0
//连接超时
internal var connectTimeout = 10_000
//读取超时
internal var readTimeout = 10_000
//写入超时
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
···省略代码···
Request
同样是请求参数的配置类,也同样采用了
建造者模式
,但相比于
OkHttpClient
,
Request
就十分简单了,只有四个参数,分别是
请求URL
、
请求方法
、
请求头
、
请求体
。
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
internal val tags: Map<Class<*>, Any>
) {
open class Builder {
//请求的URL
internal var url: HttpUrl? = null
//请求方法,如:GET、POST..
internal var method: String
//请求头
internal var headers: Headers.Builder
//请求体
internal var body: RequestBody? = null
···省略代码···
Call
请求调用接口,表示这个请求已经准备好
可以执行
,也
可以取消
,
只能执行一次
。
interface Call : Cloneable {
/** 返回发起此调用的原始请求 */
fun request(): Request
/**
* 同步请求,立即执行。
*
* 抛出两种异常:
@Throws(IOException::class)
fun execute(): Response
/**
* 异步请求,将请求安排在将来的某个时间点执行。
* 如果在执行过一回的前提下再次执行抛出IllegalStateException */
fun enqueue(responseCallback: Callback)
/** 取消请求。已经完成的请求不能被取消 */
fun cancel()
/** 是否已被执行 */
fun isExecuted(): Boolean
/** 是否被取消 */
fun isCanceled(): Boolean
/** 一个完整Call请求流程的超时时间配置,默认选自[OkHttpClient.Builder.callTimeout] */
fun timeout(): Timeout
/** 克隆这个call,创建一个新的相同的Call */
public override fun clone(): Call
/** 利用工厂模式来让 OkHttpClient 来创建 Call对象 */
fun interface Factory {
fun newCall(request: Request): Call
}
}
RealCall
在
OkHttpClient
中,我们利用
newCall
方法来创建一个
Call
对象,但从源码中可以看出,
newCall
方法返回的是一个
RealCall
对象。
OkHttpClient.kt
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall
是
Call接口
的具体实现类,是应用端与网络层的连接桥,展示应用端原始的请求与连接数据,以及网络层返回的
response
及其它数据流。 通过使用方法也可知,创建
RealCall
对象后,就要调用同步或异步请求方法,所以它里面还包含
同步请求 execute()
与
异步请求 enqueue()
方法。(后面具体展开分析)
AsyncCall
异步请求调用,是
RealCall
的一个内部类,就是一个
Runnable
,被调度器中的线程池所执行。
inner class AsyncCall(
//用户传入的响应回调方法
private val responseCallback: Callback
) : Runnable {
//同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
···省略代码···
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//调用线程池执行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//请求失败,调用调度器finish方法
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//请求成功,获取到服务器返回的response
val response = getResponseWithInterceptorChain()
signalledCallback = true
//调用 Callback.onResponse() 方法,将 response 传递出去
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
//请求出现异常,调用cancel方法来取消请求
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
//请求失败,调用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
//请求结束,调用调度器finish方法
client.dispatcher.finished(this)
}
}
}
}
Dispatcher
调度器,用来调度
Call
对象,同时包含线程池与异步请求队列,用来存放与执行
AsyncCall
对象。
class Dispatcher constructor() {
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
//创建一个缓存线程池,来处理请求调用
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** 已准备好的异步请求队列 */
@get:Synchronized
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在运行的异步请求队列, 包含取消但是还未finish的AsyncCall */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在运行的同步请求队列, 包含取消但是还未finish的RealCall */
private val runningSyncCalls = ArrayDeque<RealCall>()
···省略代码···
}
总结一下
对象
|
作用
|
Call
|
请求调用接口,表示这个请求已经准备好可以执行,也可以被取消,只能执行一次。
|
RealCall
|
Call
接口的具体实现类,是应用与网络层之间的连接桥,包含
OkHttpClient
与
Request
信息。
|
AsyncCall
|
异步请求调用,其实就是个
Runnable
,会被放到线程池中进行处理。
|
Dispatcher
|
调度器,用来调度
Call
对象,同时包含线程池与异步请求队列,用来存放与执行
AsyncCall
对象。
|
Request
|
请求类,包含
url
、
method
、
headers
、
body
。
|
Response
|
网络层返回的响应数据。
|
Callback
|
响应回调函数接口,包含
onFailure
、
onResponse
两个方法。
|
流程分析
介绍完了对象,接下来就根据使用方法,具体看一下源码吧。
同步请求
同步请求的使用方法。
client.newCall(request).execute();
newCall
方法就是创建一个
RealCall
对象,然后执行其
execute()
方法。
RealCall.kt
override fun execute(): Response {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//请求超时开始计时
timeout.enter()
//开启请求监听
callStart()
try {
//调用调度器中的 executed() 方法,调度器只是将 call 加入到了runningSyncCalls队列中
client.dispatcher.executed(this)
//调用getResponseWithInterceptorChain 方法拿到 response
return getResponseWithInterceptorChain()
} finally {
//执行完毕,调度器将该 call 从 runningSyncCalls队列中移除
client.dispatcher.finished(this)
}
}
调用调度器
executed
方法,就是将当前的
RealCall
对象加入到
runningSyncCalls
队列中,然后调用
getResponseWithInterceptorChain
方法拿到
response
。
异步请求
在来看看异步请求。
RealCall.kt
override fun enqueue(responseCallback: Callback) {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//开启请求监听
callStart()
//新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
然后调用调度器的
enqueue
方法,
Dispatcher.kt
internal fun enqueue(call: AsyncCall) {
//加锁,保证线程安全
synchronized(this) {
//将该请求调用加入到 readyAsyncCalls 队列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
//通过域名来查找有没有相同域名的请求,有则复用。
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//执行请求
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
//判断是否有请求正在执行
val isRunning: Boolean
//加锁,保证线程安全
synchronized(this) {
//遍历 readyAsyncCalls 队列
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//runningAsyncCalls 的数量不能大于最大并发请求数 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//同域名最大请求数5,同一个域名最多允许5条线程同时执行请求
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
//从 readyAsyncCalls 队列中移除,并加入到 executableCalls 及 runningAsyncCalls 队列中
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
//通过运行队列中的请求数量来判断是否有请求正在执行
isRunning = runningCallsCount() > 0
}
//遍历可执行队列,调用线程池来执行AsyncCall
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
调度器的
enqueue
方法就是将
AsyncCall
加入到
readyAsyncCalls
队列中,然后调用
promoteAndExecute
方法来执行请求,
promoteAndExecute
方法做的其实就是遍历
readyAsyncCalls
队列,然后将符合条件的请求用线程池执行,也就是会执行
AsyncCall.run()
方法。
AsyncCall 方法的具体代码看
基本对象介绍 AsyncCall
,这边就不在此展示了,简单来说就是调用
getResponseWithInterceptorChain
方法拿到
response
,然后通过
Callback.onResponse
方法传递出去。反之,如果请求失败,捕获了异常,就通过
Callback.onFailure
将异常信息传递出去。 最终,请求结束,调用调度器
finish
方法。
Dispatcher.kt
/** 异步请求调用结束方法 */
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** 同步请求调用结束方法 */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将当前请求调用从 正在运行队列 中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//继续执行剩余请求,将call从readyAsyncCalls中取出加入到runningAsyncCalls,然后执行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
//如果执行完了所有请求,处于闲置状态,调用闲置回调方法
idleCallback.run()
}
}
获取Response
接着就是看看
getResponseWithInterceptorChain
方法是如何拿到
response
的。
internal fun getResponseWithInterceptorChain(): Response {
//拦截器列表
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
//构建拦截器责任链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
//如果call请求完成,那就意味着交互完成了,没有更多的东西来交换了
var calledNoMoreExchanges = false
try {
//执行拦截器责任链来获取 response
val response = chain.proceed(originalRequest)
//如果被取消,关闭响应,抛出异常
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
简单概括一下:这里采用了
责任链设计模式
,通过拦截器构建了以
RealInterceptorChain
责任链,然后执行
proceed
方法来得到
response
。
那么,这又涉及
拦截器
是什么?
拦截器责任链
又是什么?
Interceptor
只声明了一个拦截器方法,在子类中具体实现,还包含一个
Chain
接口,核心方法是
proceed(request)
处理请求来获取
response
。
fun interface Interceptor {
/** 拦截方法 */
@Throws(IOException::class)
fun intercept(chain: Chain): Response
interface Chain {
/** 原始请求数据 */
fun request(): Request
/** 核心方法,处理请求,获取response */
@Throws(IOException::class)
fun proceed(request: Request): Response
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}
RealInterceptorChain
拦截器链就是实现
Interceptor.Chain
接口,重点就是复写的
proceed
方法。
class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
···省略代码···
private var calls: Int = 0
override fun call(): Call = call
override fun request(): Request = request
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
//index+1, 复制创建新的责任链,也就意味着调用责任链中的下一个处理者,也就是下一个拦截器
val next = copy(index = index + 1, request = request)
//取出当前拦截器
val interceptor = interceptors[index]
//执行当前拦截器的拦截方法
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
}
链式调用,最终会执行拦截器列表中的每个拦截器,返回
Response
。
拦截器
OK,接下来就该看看拦截器列表中的具体拦截器了。
先上各类拦截器的总结,按顺序:
-
client.interceptors
:这是由开发者设置的,会在所有的拦截器处理之前进行
最早
的拦截处理,可用于添加一些公共参数,如
自定义header
、
自定义log
等等。
-
RetryAndFollowUpInterceptor
:这里会对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。
-
BridgeInterceptor
:是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应。
-
CacheInterceptor
:这里主要是缓存的相关处理,会根据用户在
OkHttpClient
里定义的缓存配置,然后结合请求新建一个缓存策略,由它来判断是使用网络还是缓存来构建
response
。
-
ConnectInterceptor
:这里主要就是负责建立连接,会建立
TCP连接
或者
TLS连接
。
-
client.networkInterceptors
:这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。
-
CallServerInterceptor
:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,将请求头与请求体发送给服务器,以及解析服务器返回的
response
。
接下来我们按顺序,从上往下,对这些拦截器进行一一解读。
client.interceptors
这是用户自己定义的拦截器,称为
应用拦截器
,会保存在
OkHttpClient
的
interceptors: List<Interceptor>
列表中。 他是拦截器责任链中的
第一个拦截器
,也就是说会第一个执行拦截方法,我们可以通过它来添加
自定义Header信息
,如:
class HeaderInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request().newBuilder()
.addHeader("device-android", "xxxxxxxxxxx")
.addHeader("country-code", "ZH")
.build();
return chain.proceed(request);
}
}
//然后在 OkHttpClient 中加入
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.cookieJar(new MyCookieJar())
.addInterceptor(new HeaderInterceptor())//添加自定义Header拦截器
.build();
RetryAndFollowUpInterceptor
第二个拦截器,从它的名字也可知道,它负责请求失败的重试工作与重定向的后续请求工作,同时它会对连接做一些初始化工作。
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
//这里会新建一个ExchangeFinder,ConnectInterceptor会使用到
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
//尝试通过路由连接失败。该请求将不会被发送。
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
//尝试与服务器通信失败。该请求可能已发送。
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
//尝试关联上一个response,注意:body是为null
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
//会根据 responseCode 来判断,构建一个新的request并返回来重试或者重定向
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
//如果请求体是一次性的,不需要再次重试
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
//最大重试次数,不同的浏览器是不同的,比如:Chrome为21,Safari则是16
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
/** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
//客户端禁止重试
if (!client.retryOnConnectionFailure) return false
//不能再次发送该请求体
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
//发生的异常是致命的,无法恢复,如:ProtocolException
if (!isRecoverable(e, requestSendStarted)) return false
//没有更多的路由来尝试重连
if (!call.retryAfterFailure()) return false
// 对于失败恢复,使用带有新连接的相同路由选择器
return true
}
···省略代码···
BridgeInterceptor
从它的名字可以看出,他的定位是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,比如:添加
Content-Type
,添加
Cookie
,添加
User-Agent
等等。再将服务器返回的
response
做一些处理转换为客户端需要的
response
。比如:移除响应头中的
Content-Encoding
、
Content-Length
等等。
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
//获取原始请求数据
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
//重新构建请求头,请求体信息
val body = userRequest.body
val contentType = body.contentType()
requestBuilder.header("Content-Type", contentType.toString())
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.header("Host", userRequest.url.toHostHeader())
requestBuilder.header("Connection", "Keep-Alive")
···省略代码···
//添加cookie
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
//添加user-agent
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
//重新构建一个Request,然后执行下一个拦截器来处理该请求
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
//创建一个新的responseBuilder,目的是将原始请求数据构建到response中
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
//修改response header信息,移除Content-Encoding,Content-Length信息
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
//修改response body信息
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
···省略代码···
CacheInterceptor
用户可以通过
OkHttpClient.cache
来配置缓存,缓存拦截器通过
CacheStrategy
来判断是使用网络还是缓存来构建
response
。
class CacheInterceptor(internal val cache: Cache?) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
//通过request从OkHttpClient.cache中获取缓存
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//创建一个缓存策略,用来确定怎么使用缓存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
//为空表示不使用网络,反之,则表示使用网络
val networkRequest = strategy.networkRequest
//为空表示不使用缓存,反之,则表示使用缓存
val cacheResponse = strategy.cacheResponse
//追踪网络与缓存的使用情况
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
//有缓存但不适用,关闭它
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}
//如果网络被禁止,但是缓存又是空的,构建一个code为504的response,并返回
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
//如果我们禁用了网络不使用网络,且有缓存,直接根据缓存内容构建并返回response
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
//为缓存添加监听
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
//责任链往下处理,从服务器返回response 赋值给 networkResponse
networkResponse = chain.proceed(networkRequest)
} finally {
//捕获I/O或其他异常,请求失败,networkResponse为空,且有缓存的时候,不暴露缓存内容。
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
//如果有缓存
if (cacheResponse != null) {
//且网络返回response code为304的时候,使用缓存内容新构建一个Response返回。
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
//否则关闭缓存响应体
cacheResponse.body?.closeQuietly()
}
}
//构建网络请求的response
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
//如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络请求response存到cache中
if (cache != null) {
//根据response的code,header以及CacheControl.noStore来判断是否可以缓存
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 将该response存入缓存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
//根据请求方法来判断缓存是否有效,只对Get请求进行缓存,其它方法的请求则移除
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
//缓存无效,将该请求缓存从client缓存配置中移除
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
···省略代码···
ConnectInterceptor
负责实现与服务器真正建立起连接,
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
//初始化一个exchange对象
val exchange = realChain.call.initExchange(chain)
//根据这个exchange对象来复制创建一个新的连接责任链
val connectedChain = realChain.copy(exchange = exchange)
//执行该连接责任链
return connectedChain.proceed(realChain.request)
}
}
一扫下来,代码十分简单,拦截方法里就只有三步。
-
初始化一个
exchange
对象。
-
然后根据这个
exchange
对象来复制创建一个新的连接责任链。
-
执行该连接责任链。
那这个
exchange
对象又是什么呢?
RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...省略代码...
//这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创建的
val exchangeFinder = this.exchangeFinder!!
//返回一个ExchangeCodec(是个编码器,为request编码以及为response解码)
val codec = exchangeFinder.find(client, chain)
//根据exchangeFinder与codec新构建一个Exchange对象,并返回
val result = Exchange(this, eventListener, exchangeFinder, codec)
...省略代码...
return result
}
具体看看
ExchangeFinder.find()
这一步,
ExchangeFinder.kt
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
//查找合格可用的连接,返回一个 RealConnection 对象
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
//根据连接,创建并返回一个请求响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,分别对应Http1协议与Http2协议
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
继续往下看
findHealthyConnection
方法
ExchangeFinder.kt
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
//重点:查找连接
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
//检查该连接是否合格可用,合格则直接返回该连接
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
//如果该连接不合格,标记为不可用,从连接池中移除
candidate.noNewExchanges()
...省略代码...
}
}
简单概括一下就是:通过
findConnection
方法来查找连接,找到连接后判断是否是合格可用的,合格就直接返回该连接。
所以核心方法就是
findConnection
,我们继续深入看看该方法:
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
//第一次,尝试重连 call 中的 connection,不需要去重新获取连接
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
//如果 call 中的 connection 还没有释放,就重用它。
if (call.connection != null) {
check(toClose == null)
return callConnection
}
//如果 call 中的 connection 已经被释放,关闭Socket.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
//需要一个新的连接,所以重置一些状态
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
//第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
//连接池中是空的,准备下次尝试连接的路由
val routes: List<Route>?
val route: Route
...省略代码...
//第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
//第四次,手动创建一个新连接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
//第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。
//这一步主要是为了校验一下,比如已经有了一条连接了,就可以直接复用,而不用使用手动创建的新连接。
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
//将手动创建的新连接放入连接池
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
在代码中可以看出,一共做了5次尝试去得到连接:
-
第一次,尝试重连 call 中的 connection,不需要去重新获取连接。
-
第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用。
-
第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用。
-
第四次,手动创建一个新连接。
-
第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。
OK,到了这一步,就算建立起了连接。
client.networkInterceptors
该拦截器称为
网络拦截器
,与
client.interceptors
一样也是由用户自己定义的,同样是以列表的形式存在
OkHttpClient
中。
那这两个拦截器有什么不同呢?
其实他两的不同都是由于他们所处的位置不同所导致的,应用拦截器处于第一个位置,所以无论如何它
都会被执行,而且只会执行一次
。而网络拦截器处于倒数第二的位置,它
不一定会被执行,而且可能会被执行多次
,比如:在
RetryAndFollowUpInterceptor
失败或者
CacheInterceptor
直接返回缓存的情况下,我们的网络拦截器是不会被执行的。
CallServerInterceptor
到了这里,客户端与服务器已经建立好了连接,接着就是将请求头与请求体发送给服务器,以及解析服务器返回的
response
了。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
try {
//写入请求头
exchange.writeRequestHeaders(request)
//如果不是GET请求,并且请求体不为空
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
//当请求头为"Expect: 100-continue"时,在发送请求体之前需要等待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。
//POST请求,先发送请求头,在获取到100继续状态后继续发送请求体
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
//刷新请求,即发送请求头
exchange.flushRequest()
//解析响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
//写入请求体
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
//如果请求体是双公体,就先发送请求头,稍后在发送请求体
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
//写入请求体
requestBody.writeTo(bufferedRequestBody)
} else {
//如果获取到了"Expect: 100-continue"响应,写入请求体
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
···省略代码···
//请求结束,发送请求体
exchange.finishRequest()
···省略代码···
try {
if (responseBuilder == null) {
//读取响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
···省略代码···
//构建一个response
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
···省略代码···
return response
···省略代码···
简单概括一下:写入发送请求头,然后根据条件是否写入发送请求体,请求结束。解析服务器返回的请求头,然后构建一个新的
response
,并返回。 这里
CallServerInterceptor
是拦截器责任链中最后一个拦截器了,所以他不会再调用
chain.proceed()
方法往下执行,而是将这个构建的
response
往上传递给责任链中的每个拦截器。
拦截器流程图
总结
我们分析了请求的流程,包括同步请求与异步请求,还仔细分析了拦截器责任链中的每个拦截器,现在画一个流程图,简单总结一下,你可以对照着流程图,在走一遍流程。
完整流程图
反思
设计模式
-
建造者模式
:不论是在
OkHttpClient
、
Request
还是
Response
中都用到了建造者模式,因为这几个类中都有很多参数,需要供用户选择需要的参数来构建其想要的实例,所以在开源库中,
Build模式
是很常见的。
-
工厂方法模式
:帮助生成复杂对象,如:
OkHttpClient.newCall(request Request) 来创建 Call 对象
。
-
责任链模式
:这个就用的很绝妙了,将7个拦截器构成拦截器责任链,然后按顺序从上往下执行,得到
Response
后,从下往上传回去。
线程安全
在
AsyncCall
类中的
callsPerHost
变量,使用了
Volatile
+
AtomicInteger
来修饰,从而保证在多线程下的线程安全。
inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
//同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set
...省略代码...
数据结构
为什么
readyAsyncCalls
runningAsyncCalls
runningSyncCalls
采用
ArrayDeque
呢?
两个点回答
:
一
、他们都是用来存放网络请求的,这些请求需要做到先到先得,所以采用队列。
二
、根据代码所示,当执行
enqueue
时,我们需要遍历
readyAsyncCalls
,将符合执行条件的
Call
加入到
runningAsyncCalls
,这相对比于链表来说,数组的查找效率要更高,所以采用
ArrayDeque
。
结尾
到此,关于
OkHttp
的源码解析就介绍啦。
其实学习源码的最好方式,就是自己将代码克隆下来,然后对着使用方法,按流程,一步一步往下走。
其实分享文章的最大目的正是等待着有人指出我的错误,如果你发现哪里有错误,请毫无保留的指出即可,虚心请教。 另外,如果你觉得文章不错,对你有所帮助,请给我点个赞,就当鼓励,谢谢~Peace~!
原文:
https://juejin.cn/post/7033307467199021086