OkHttp是主流的网络请求框架,Android网络请求基本的项目封装也是有Rxjava+Retrofit+Okhttp进行封装,面对Kotlin语法可能也有的同学使用Coroutine+Retrofit+Okhttp进行封装 这篇文章并非将封装 而是对OkHttp源码性进行阅读 对OkHttp进行一步的了解,并且学习里面一些里面的设计思想。
源码是最好的老师!
本文基于okhttp:4.2.2
okhttp的基本使用:
class OkhttpActivity : AppCompatActivity(){
private val job = Job()
private val ioScope = CoroutineScope(Dispatchers.IO + job)
lateinit var handler:CoroutineExceptionHandler
companion object{
val TAG = "OkhttpActivity"
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_okhttp)
handler = CoroutineExceptionHandler{_,exception ->
run {
Log.d(TAG, "exception" + exception.message)
复制代码
同步请求方式
var client = OkHttpClient.Builder()
.connectTimeout(5, TimeUnit.SECONDS)
.build()
var request = Request.Builder().url("https://www.wanandroid.com/article/list/0/json").build()
ioScope.launch(handler) {
var response = client.newCall(request).execute()
复制代码
以上的操作是基本的实例代码 :
步骤如下:
创建
OkHttpClient
创建请求对象
Request
同步请求直接写成一句了
,实际上是先
调
newCall()
方法返回一个
Call
对象
调用
execute()
方法,最终根据返回Respone对象
首先分析:
OkHttpClient
,从
OkHttpClient.Builder()
open class OkHttpClient internal constructor(builder: Builder) :
Cloneable, Call.Factory, WebSocket.Factory {
constructor() : this(Builder())
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher()
internal var connectionPool: ConnectionPool = ConnectionPool()
internal val interceptors: MutableList<Interceptor> = mutableListOf()
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
fun dispatcher(dispatcher: Dispatcher) = apply {
this.dispatcher = dispatcher
fun connectTimeout(timeout: Long, unit: TimeUnit) = apply {
connectTimeout = checkDuration("timeout", timeout, unit)
fun build(): OkHttpClient = OkHttpClient(this)
上面可以看到Builder是OkHttpClient中的内部类,内部类中的build()将自己作为参数传入OkHttpClient(this)调用了constructor() : this(Builder()),这里面的代码中,将所有设置方法返回自己本身this使用的是设计模式的建造者模式。
(可能有些同学没怎么看懂dispatcher()这个方法返回Builder本身,这个可以看一下kotlin的apply这些函数)
第二步分析建请求对象 Request
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
fun newBuilder(): Builder = Builder(this)
open class Builder {
internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null
internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()
constructor() {
this.method = "GET"
this.headers = Headers.Builder()
internal constructor(request: Request) {
this.url = request.url
this.method = request.method
this.body = request.body
this.tags = if (request.tags.isEmpty()) {
mutableMapOf()
} else {
request.tags.toMutableMap()
this.headers = request.headers.newBuilder()
其实跟OkHttpClient类似,Reuqest这个类也有个内部类Builder ,同样是建造者模式的一个应用,其中可以看到构造方法中 this.method = "GET"代表默认是为GET的请求的,且有个带参的构造函数constructor(request: Request)在Reuqest中被newBuilder()方法调用。(这个方法一般用于重新设置新的请求头的需求,例如 拦截器~)
接下来 分析newCall()
OkHttpClient{
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, forWebSocket = false)
internal class RealCall private constructor(
val client: OkHttpClient,
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
companion object {
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
调用newCall()方法,可以看到实例化了RealCall并且transmitter = Transmitter(client, this)实例化了RealCall中的transmitter,并返回了RealCall本身
接下来看execute
override fun execute(): Response {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
transmitter.timeoutEnter()
transmitter.callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(call)
class Dispatcher constructor() {
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
return executorServiceOrNull!!
private val runningSyncCalls = ArrayDeque<RealCall>()
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
RealInterceptorChain类
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
calls++
check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
check(this.exchange == null || calls <= 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
关键代码1:transmitter.callStart(),中最终调用了eventListener.callStart(call) 设置监听回调。
关键代码2: client.dispatcher.executed(this)
做请求一次的判断跟标记,如果!executed == true,则抛出异常,false则是标记为true``client.dispatcher.executed(this),Dispatcher中调用executed将其添加到runningSyncCalls数组双端队列中
关键代码3 getResponseWithInterceptorChain()这个是最终获取到数据Response的调用,方法中设置的各种拦截器,包括cookie cache等,默认cookie为CookieJar.NO_COOKIES
关键代码4 真正处理代码拦截器的地方,这里使用责任链的设计模式,在// Call the next interceptor in the chain.// 关键代码5 这个地方一直调用链表的下个拦截器的 intercept()方法,递归的形式.
关键代码5 我们可以看到,最新放入的拦截器最先处理, 优先我们设置的拦截器
异步请求方式
创建OkHttpClient对象
var client = OkHttpClient.Builder()
.connectTimeout(5, TimeUnit.SECONDS)
.build()
创建Request对象
var request = Request.Builder().url("https://www.wanandroid.com/article/list/0/json").build()
ioScope.launch(handler) {
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
Log.d(TAG, "onFailure" + e.message)
override fun onResponse(call: Call, response: Response) {
Log.d(TAG, "onResponse" + response.message)
复制代码
以上的操作是基本的实例代码 :
步骤如下:
创建 OkHttpClient
创建请求对象 Request
调newCall()方法返回一个Call对象
调用enqueue()方法,并添加内部实现类Callback,实现回调的两个方法
直接从RealCall的 enqueue()入手
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
transmitter.callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
这里有两点要跟 enqueue()方法跟AsyncCall()类,先看下面AsyncCall()类的
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile private var callsPerHost = AtomicInteger(0)
fun callsPerHost(): AtomicInteger = callsPerHost
fun executeOn(executorService: ExecutorService) {
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
} finally {
if (!success) {
client.dispatcher.finished(this)
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
responseCallback.onFailure(this@RealCall, e)
} finally {
client.dispatcher.finished(this)
AsyncCall实现了Runnable接口,实现run()方法 线程池跑的时候就是跑run里面的逻辑。这里调用了getResponseWithInterceptorChain()这里上面有讲到是去进行真正网络请求的地方。然后将获取回来的结果通过传进来的responseCallback,回调回去。
client.dispatcher.enqueue(AsyncCall(responseCallback))的AsyncCall()我们来讲讲enqueue()这个方法
/////////////////////////////////////////////
class Dispatcher constructor() {
// ...省略其它代码
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call) // 关键代码1
// 关键代码 2
if (!call.get().forWebSocket) {
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
// 关键代码3
promoteAndExecute()
//////////////////////////////////////////////////////////////////////////////////////
// 关键代码3
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 关键代码4
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
isRunning = runningCallsCount() > 0
// 关键代码5
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 关键代码 6
asyncCall.executeOn(executorService)
return isRunning
关键代码1 调用Dispatcher类中的enqueue,把call放入readyAsyncCalls 这里补充一点关于这几个Call的数组对列分别记录什么:
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
private val runningSyncCalls = ArrayDeque<RealCall>()
关键代码2 将其他的Call里面的callsPerHost的值传给它 (这里是为了后面做判断主机的而做的操作,主机数超过5时暂时不放到运行数组队列中)
关键代码3 4 promoteAndExecute这个比较重点,从准备好的readyAsyncCalls轮训获取出每个AsyncCalls,如果RunningAsyncCalls的总数大于this.maxRequestsPerHost(64)则停止轮训,或者 asyncCall.callsPerHost().get()的值超过this.maxRequests(5)则忽略这次的逻辑。
其逻辑主要就是将准备态的ReadyAsyncCalls的AsyncCall添加到runningAsyncCalls中,并从ReadyAsyncCalls删除。
关键代码5 轮训使用线程池(下面有代码展示)运行readyAsyncCalls里面的符合目前逻辑要求的asyncCall进行运行。
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
return executorServiceOrNull!!
跟着 走一下关键代码6的逻辑asyncCall.executeOn(executorService)
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher))
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this)
这里调用了 executorService.execute(this)至此 便调用到了AsyncCall 的run方法,这里可以看一下 上面源码的 // 先分析一下AsyncCall这块调用
总结: OkHttp通过建造者模式创建OkHttpClient、Request和Response,在通过创建Call发起同步异步请求,Dispatcher作为中转点,做Call的请求队列的维护为主,使用线程池对Call的请求队列进行调度运行,最后通过各种拦截器(责任链模式) 最终请求网络返回数据信息