接上篇的 Volley 源码 解析,目前项目中更多的用到的是 OkHttpUtils OkHttp 所以有必要了解它的原理,以便遇到网络相关的问题时,可以及时的定位并解决问题,下面就开始吧。

本文的目录大致是这样:

  • OkHttp 的基本使用
  • OkHttp 的源码解析(V3.5.0)
  • OkHttp 连接池复用
  • OkHttp 的优缺点
  • OkHttp 的基本使用

    在 gradle 中添加依赖

    1
    compile 'com.squareup.okhttp3:okhttp:3.5.0'

    1.首先创建OkHttpClient

    1
    OkHttpClient client = new OkHttpClient();

    2.构造Request对象

    1
    2
    3
    4
    Request request = new Request.Builder()
    .get()
    .url("https://www.baidu.com")
    .build();

    3.将Request封装为Call

    1
    Call call = client.newCall(request);

    4.根据需要调用同步或者异步请求方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    //同步调用,返回Response,会抛出IO异常
    Response response = call.execute();

    //异步调用,并设置回调函数
    call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {

    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
    Log.e("=====Younger==", "===" + (Looper.myLooper() == Looper.getMainLooper()));
    //打印出的结果是false , 可以看出 这个回调并没有回到主线程,需要我们自己处理线程切换的问题
    }
    });

    同步调用会阻塞主线程,一般不用

    异步调用的回调函数是在子线程,我们不能在子线程更新UI,需要借助于runOnUiThread()方法或者Handler来处理

    post 也是类似的, 相信大家都会用使用,接下来我们来看重头戏-源码。

    OkHttp 源码解析

    okHttpClient

    首先来看,我们进行网络请求时使用的方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    Call call = okHttpClient.newCall(request);

    实际调用

    @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
    }


    new 了一个 RealCall,这是它的构造方法

    RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
    }

    RealCall

    实际上的 Call 的 enqueue 调用的是 RealCall的 enqueue方法

    1
    call.enqueue(new ...);

    下面我们看下 RealCall的 enqueue是如何实现的

    1
    2
    3
    4
    5
    6
    7
    8
    @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }

    可以看到最终的请求处理是 dispatcher 来完成的,接下来看下 dispatcher

    dispatcher

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
     //最大并发请求书
    private int maxRequests = 64;
    //每个主机的最大请求数
    private int maxRequestsPerHost = 5;
    private Runnable idleCallback;

    /** 执行的线程池. Created lazily. */
    private ExecutorService executorService;

    //将要运行的异步请求队列
    /** Ready async calls in the order they'll be run. */
    private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

    //正在执行的异步请求队列
    /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
    private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    //正在执行的同步请求队列
    /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
    private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();


    public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
    }

    public Dispatcher() {
    }

    public synchronized ExecutorService executorService() {
    if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
    }

    Dispatcher 有两个构造方法,可以自己指定线程池, 如果没有指定, 则会默认创建默认线程池,可以看到核心数为0,缓存数可以是很大, 比较适合执行大量的耗时比较少的任务。

    接着看 enqueue是如何实现的

    1
    2
    3
    4
    5
    6
    7
    8
    9

    synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    runningAsyncCalls.add(call);
    executorService().execute(call);
    } else {
    readyAsyncCalls.add(call);
    }
    }

    当正在运行的异步请求队列中的数量小于64, 并且 正在运行的请求主机数小于5,把请求加载到runningAsyncCalls 中并在线程池中执行, 否则就加入到 readyAsyncCalls 进行缓存等待。

    runningCallsForHost是如何实现的呢

    1
    2
    3
    4
    5
    6
    7
    8
    9

    /** Returns the number of running calls that share a host with {@code call}. */
    private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
    if (c.host().equals(call.host())) result ++;
    }
    return result;
    }

    正在执行的网络请求中 同一个host最多只能是5个。

    上面可以看到传递进来的是 AsyncCall 然后 execute 那我们看下 AsyncCall方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
    super("OkHttp %s", redactedUrl());
    this.responseCallback = responseCallback;
    }

    String host() {
    return originalRequest.url().host();
    }

    Request request() {
    return originalRequest;
    }

    RealCall get() {
    return RealCall.this;
    }

    @Override protected void execute() {
    boolean signalledCallback = false;
    try {
    //获取请求报文
    Response response = getResponseWithInterceptorChain();
    if (retryAndFollowUpInterceptor.isCanceled()) {
    signalledCallback = true;
    responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
    signalledCallback = true;
    responseCallback.onResponse(RealCall.this, response);
    }
    } catch (IOException e) {
    if (signalledCallback) {
    // Do not signal the callback twice!
    Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
    responseCallback.onFailure(RealCall.this, e);
    }
    } finally {
    client.dispatcher().finished(this);
    }
    }
    }

    看到 NamedRunnable 实现了 Runnable,AsyncCall 中的 execute 是对网络请求的具体处理。

    1
    Response response = getResponseWithInterceptorChain();

    能明显看出这就是对请求的处理,在看它的具体实现之前先看下 client.dispatcher().finished 的方法实现。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
      /** Used by {@code AsyncCall#run} to signal completion. */
    void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
    }

    /** Used by {@code Call#execute} to signal completion. */
    void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
    }

    // 最后调用这个
    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
    }
    }

    由于 promoteCalls 是true 我们看下 promoteCalls 的方法实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();

    if (runningCallsForHost(call) < maxRequestsPerHost) {
    i.remove();
    runningAsyncCalls.add(call);
    executorService().execute(call);
    }

    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
    }

    根据代码可以明显看出 , 当一个请求结束了调用 finished 方法,最终到promoteCalls就是把 异步等待队列中的请求,取出放到 异步执行队列中。

  • 如果异步执行队列已经是满的状态就不加了,return
  • 如果 异步等待队列中 没有需要执行的网络请求 也就没有必要进行下一步了 return
  • 上面的两条都没遇到,遍历 异步等待队列,取出队首的请求,如果这个请求的 host 符合 (正在执行的网络请求中 同一个host最多只能是5个)的这个条件, 把 等待队列的这个请求移除, 加入到 正在执行的队列中, 线程开始执行。 如果不符合继续 遍历操作。
  • interceptors 拦截器

    接着看 RealCall 的 getResponseWithInterceptorChain 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //用户自己定义的拦截器
    interceptors.addAll(client.interceptors());
    //系统提供的重试拦截器,失败后的重试和重定向
    interceptors.add(retryAndFollowUpInterceptor);
    //负责把用户构造的请求转换为发送到服务器的请求 、把服务器返回的响应转换为用户友好的响应 处理 配置请求头等信息
    //从应用程序代码到网络代码的桥梁。首先,它根据用户请求构建网络请求。然后它继续呼叫网络。最后,它根据网络响应构建用户响应。
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
    //设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
    //可配置用户自己设置的缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接拦截器 这里才是真正的请求网络
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
    //配置okhttpClient 时设置的networkInterceptors
    //返回观察单个网络请求和响应的不可变拦截器列表。
    interceptors.addAll(client.networkInterceptors());
    }
    //执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据
    //进行http请求报文的封装与请求报文的解析
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //创建责任链
    Interceptor.Chain chain = new RealInterceptorChain(
    interceptors, null, null, null, 0, originalRequest);
    //执行 责任链
    return chain.proceed(originalRequest);
    }

    看下 RealInterceptorChain 的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28


    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    Connection connection) throws IOException {

    if (index >= interceptors.size()) throw new AssertionError();

    calls++;
    //创建新的拦截链,链中的拦截器集合index+1
    RealInterceptorChain next = new RealInterceptorChain(
    interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    // 执行当前的拦截器
    Interceptor interceptor = interceptors.get(index);
    // 执行拦截器
    Response response = interceptor.intercept(next);

    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
    throw new IllegalStateException("network interceptor " + interceptor
    + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
    throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    return response;
    }

    根据上面的代码 我们可以看出,新建了一个RealInterceptorChain 责任链 并且 index+1,然后 执行interceptors.get(index); 返回Response。

    责任链中每个拦截器都会执行chain.proceed()方法之前的代码,等责任链最后一个拦截器执行完毕后会返回最终的响应数据,而chain.proceed() 方法会得到最终的响应数据,这时就会执行每个拦截器的chain.proceed()方法之后的代码,其实就是对响应数据的一些操作。

    接下来看下各个拦截器的具体代码

    RetryAndFollowUpInterceptor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(
    client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
    if (canceled) {
    streamAllocation.release();
    throw new IOException("Canceled");
    }

    Response response = null;
    boolean releaseConnection = true;
    try {
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
    releaseConnection = false;
    } catch (RouteException e) {
    // The attempt to connect via a route failed. The request will not have been sent.
    if (!recover(e.getLastConnectException(), false, request)) {
    throw e.getLastConnectException();
    }
    //如果出现异常 不释放连接, 继续重试
    releaseConnection = false;
    continue;
    } catch (IOException e) {
    // An attempt to communicate with a server failed. The request may have been sent.
    boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
    //如果出现异常 不释放连接, 继续重试
    if (!recover(e, requestSendStarted, request)) throw e;
    releaseConnection = false;
    continue;
    } finally {
    // We're throwing an unchecked exception. Release any resources.
    if (releaseConnection) {
    streamAllocation.streamFailed(null);
    streamAllocation.release();
    }
    }

    // Attach the prior response if it exists. Such responses never have a body.
    if (priorResponse != null) {
    response = response.newBuilder()
    .priorResponse(priorResponse.newBuilder()
    .body(null)
    .build())
    .build();
    }

    Request followUp = followUpRequest(response);

    if (followUp == null) {
    if (!forWebSocket) {
    streamAllocation.release();
    }
    return response;
    }

    closeQuietly(response.body());

    //重试次数大于20次 ,不再试了,释放连接,
    if (++followUpCount > MAX_FOLLOW_UPS) {
    streamAllocation.release();
    throw new ProtocolException("Too many follow-up requests: " + followUpCount);
    }

    if (followUp.body() instanceof UnrepeatableRequestBody) {
    streamAllocation.release();
    throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
    }

    if (!sameConnection(response, followUp.url())) {
    streamAllocation.release();
    streamAllocation = new StreamAllocation(
    client.connectionPool(), createAddress(followUp.url()), callStackTrace);
    } else if (streamAllocation.codec() != null) {
    throw new IllegalStateException("Closing the body of " + response
    + " didn't close its backing stream. Bad interceptor?");
    }

    request = followUp;
    priorResponse = response;
    }
    }

    当发生 RouteException 和 IOException 都会进行 recover 重试。

    BridgeInterceptor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
    MediaType contentType = body.contentType();
    if (contentType != null) {
    requestBuilder.header("Content-Type", contentType.toString());
    }

    long contentLength = body.contentLength();
    if (contentLength != -1) {
    requestBuilder.header("Content-Length", Long.toString(contentLength));
    requestBuilder.removeHeader("Transfer-Encoding");
    } else {
    requestBuilder.header("Transfer-Encoding", "chunked");
    requestBuilder.removeHeader("Content-Length");
    }
    }

    if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
    .request(userRequest);

    if (transparentGzip
    && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
    && HttpHeaders.hasBody(networkResponse)) {
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    Headers strippedHeaders = networkResponse.headers().newBuilder()
    .removeAll("Content-Encoding")
    .removeAll("Content-Length")
    .build();
    responseBuilder.headers(strippedHeaders);
    responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
    }

    能看出 BridgeInterceptor 主要做的就是
    在请求发出之前 把请求的 信息拿出来处理成Request.Builder.header 发送出去
    当请求结果回来之后,处理header 信息。处理返回的信息。

    缓存拦截器

    CacheInterceptor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
     @Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
    ? cache.get(chain.request())
    : null;

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
    //如果networkRequest == null 则说明不使用网络请求
    //获取缓存中(CacheStrategy)的Response
    if (cache != null) {
    cache.trackResponse(strategy);
    }

    //缓存无效 关闭资源
    if (cacheCandidate != null && cacheResponse == null) {
    closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    //networkRequest == null 不使用网络请求 且没有缓存 cacheResponse == null 返回失败
    if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
    .request(chain.request())
    .protocol(Protocol.HTTP_1_1)
    .code(504)
    .message("Unsatisfiable Request (only-if-cached)")
    .body(Util.EMPTY_RESPONSE)
    .sentRequestAtMillis(-1L)
    .receivedResponseAtMillis(System.currentTimeMillis())
    .build();
    }

    // If we don't need the network, we're done.
    //如果无需网络请求, 把缓存中的结果取出来组装成返回体 返回
    if (networkRequest == null) {
    return cacheResponse.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .build();
    }

    Response networkResponse = null;
    try {
    //进行网络请求
    networkResponse = chain.proceed(networkRequest);
    } finally {
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) {
    closeQuietly(cacheCandidate.body());
    }
    }

    //网络请求结果回来了,根据情况更新缓存结果
    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
    if (networkResponse.code() == HTTP_NOT_MODIFIED) {
    Response response = cacheResponse.newBuilder()
    .headers(combine(cacheResponse.headers(), networkResponse.headers()))
    .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
    .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build();
    networkResponse.body().close();

    // Update the cache after combining headers but before stripping the
    // Content-Encoding header (as performed by initContentStream()).
    cache.trackConditionalCacheHit();
    cache.update(cacheResponse, response);
    return response;
    } else {
    closeQuietly(cacheResponse.body());
    }
    }

    Response response = networkResponse.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build();

    if (HttpHeaders.hasBody(response)) {
    CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
    response = cacheWritingResponse(cacheRequest, response);
    }

    return response;
    }

    如果用户自己配置了缓存拦截器,cacheCandidate = cache.Response 获取用户自己存储的Response,否则 cacheCandidate = null;同时从CacheStrategy 获取cacheResponse 和 networkRequest

    如果cacheCandidate != null 而 cacheResponse == null 说明缓存无效清除cacheCandidate缓存。

    如果networkRequest == null 说明没有网络,cacheResponse == null 没有缓存,返回失败的信息,责任链此时也就终止,不会在往下继续执行。

    如果networkRequest == null 说明没有网络,cacheResponse != null 有缓存,返回缓存的信息,责任链此时也就终止,不会在往下继续执行。

    执行下一个拦截器,也就是请求网络

    责任链执行完毕后,会返回最终响应数据,如果缓存存在更新缓存,如果缓存不存在加入到缓存中去。

    ConnectInterceptor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
    }

    连接复用的逻辑就是这里面, 寻找可用的链接, 复用, 这个待会分析。

    networkInterceptors

    这个是自定义的网络拦截器

    CallServerInterceptor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    @Override public Response intercept(Chain chain) throws IOException {
    //HttpStream 就是先前在 ConnectInterceptor 创建出来的
    HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();
    /发送请求的时间戳
    long sentRequestMillis = System.currentTimeMillis();
    //写入请求头信息
    httpCodec.writeRequestHeaders(request);
    //写入请求体信息(有请求体的情况)
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
    BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
    request.body().writeTo(bufferedRequestBody);
    bufferedRequestBody.close();
    }
    //结束请求
    httpCodec.finishRequest();
    //读取响应头信息
    Response response = httpCodec.readResponseHeaders()
    .request(request)
    //握手?
    .handshake(streamAllocation.connection().handshake())
    .sentRequestAtMillis(sentRequestMillis)
    .receivedResponseAtMillis(System.currentTimeMillis())
    .build();
    //openResponseBody 获取响应体信息
    int code = response.code();
    //app 不走这个
    if (forWebSocket && code == 101) {
    // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
    response = response.newBuilder()
    .body(Util.EMPTY_RESPONSE)
    .build();
    } else {
    response = response.newBuilder()
    .body(httpCodec.openResponseBody(response))
    .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
    || "close".equalsIgnoreCase(response.header("Connection"))) {
    streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
    throw new ProtocolException(
    "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
    }

    OkhttpClient 实现了Call.Fctory,负责为Request 创建 Call;

    RealCall 为Call的具体实现,其enqueue() 异步请求接口通过Dispatcher()调度器利用ExcutorService实现,而最终进行网络请求时和同步的execute()接口一致,都是通过 getResponseWithInterceptorChain() 函数实现

    getResponseWithInterceptorChain() 中利用 Interceptor 链条,责任链模式 分层实现缓存、透明压缩、网络 IO 等功能;最终将响应数据返回给用户。

    OkHttp 连接池复用

    我们知道 OkHttp 支持5个并发 socket 连接,默认keepAlive 时间为5分钟。 那究竟是怎么做到的呢

    在 ConnectInterceptor 中我们知道 newStream

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    ...

    try {
    找“健康的”RealConnection
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
    writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);

    HttpCodec resultCodec;
    if (resultConnection.http2Connection != null) {
    resultCodec = new Http2Codec(client, this, resultConnection.http2Connection);
    } else {
    // 通过RealConnection创建HttpCodec
    resultConnection.socket().setSoTimeout(readTimeout);
    resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
    resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
    resultCodec = new Http1Codec(
    client, this, resultConnection.source, resultConnection.sink);
    }

    synchronized (connectionPool) {
    codec = resultCodec;
    return resultCodec;
    }
    } catch (IOException e) {
    throw new RouteException(e);
    }
    }


    private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
    throws IOException {
    while (true) {
    // while循环直到找到return
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
    connectionRetryEnabled);

    ...

    return candidate;
    }
    }




    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
    boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
    ...
    // 如果为空,尝试从连接池中获取,这个方法的关键点,如果获取到connection不为空(第三个参数为this,找到合适的RealConnection赋值到connection

    // Attempt to get a connection from the pool.
    RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
    if (pooledConnection != null) {
    this.connection = pooledConnection;
    return pooledConnection;
    }

    selectedRoute = route;
    }


    if (selectedRoute == null) {
    selectedRoute = routeSelector.next();
    synchronized (connectionPool) {
    route = selectedRoute;
    refusedStreamCount = 0;
    }
    }
    // 如果没有找到,则创建一个
    RealConnection newConnection = new RealConnection(selectedRoute);

    synchronized (connectionPool) {
    acquire(newConnection);
    Internal.instance.put(connectionPool, newConnection);
    this.connection = newConnection;
    if (canceled) throw new IOException("Canceled");
    }
    // 进行实际的的网络连接
    newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
    connectionRetryEnabled);
    routeDatabase().connected(newConnection.route());

    return newConnection;
    }

    从上面的分析,获取RealConnection的流程,总结如下:

    在ConnectInterceptor中获取StreamAllocation的引用,通过StreamAllocation去寻找RealConnection

    如果RealConnection不为空,那么直接返回。否则去连接池中寻找并返回,如果找不到直接创建并设置到连接池中,然后再进一步判断是否重复释放到Socket。

    在实际网络连接connect中,选择不同的链接方式(有隧道链接(Tunnel)和管道链接(Socket))
    把RealConnection和HttpCodec传递给下一个拦截器

    在从连接池中获取一个连接的时候,使用了 Internal 的 get() 方法。Internal 有一个静态的实例,会在 OkHttpClient 的静态代码快中被初始化。我们会在 Internal 的 get() 中调用连接池的 get() 方法来得到一个连接。并且,从中我们明白了连接复用的一个好处就是省去了进行 TCP 和 TLS 握手的一个过程。因为建立连接本身也是需要消耗一些时间的,连接被复用之后可以提升我们网络访问的效率。

    ConnectionPool

    连接池的位于 ConnectionPool 中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /** 空闲 socket 最大连接数 */
    private final int maxIdleConnections;
    socket 的 keepAlive 时间
    private final long keepAliveDurationNs;
    private final Deque<RealConnection> connections = new ArrayDeque<>();
    final RouteDatabase routeDatabase = new RouteDatabase();
    boolean cleanupRunning;


    public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
    }

    public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
    throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
    }

    构造方法可以看到,空闲socket的最大连接数为5个,ConnectionPool是在 OkHttpClient 实例化时创建的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    RealConnection get(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
    if (connection.allocations.size() < connection.allocationLimit
    && address.equals(connection.route().address)
    && !connection.noNewStreams) {
    streamAllocation.acquire(connection);
    return connection;
    }
    }
    return null;
    }

    void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
    cleanupRunning = true;
    executor.execute(cleanupRunnable);
    }
    添加到 Deque 之前需要清理空闲的线程,
    connections.add(connection);
    }

    看下 put,get 方法,get 方法会遍历 connection 缓存列表, 当某个连接计数小于限制的大小,并且 request 的地址和缓存列表中此链接的地址完全匹配时, 则直接复用缓存列表中的 connection 作为request 的连接。

    上面可以看到 put 方法会调用清理线程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
    while (true) {
    long waitNanos = cleanup(System.nanoTime());
    if (waitNanos == -1) return;
    if (waitNanos > 0) {
    long waitMillis = waitNanos / 1000000L;
    waitNanos -= (waitMillis * 1000000L);
    synchronized (ConnectionPool.this) {
    try {
    ConnectionPool.this.wait(waitMillis, (int) waitNanos);
    } catch (InterruptedException ignored) {
    }
    }
    }
    }
    }
    };
    线程会不停的调用cleanup方法进行清理, 并返回下次需要清理的间隔时间, 然后调用 wait方法进行等待,当时间到了之后再次进行清理。 一直这样下去。

    会调用 cleanup方法,下面是cleanup方法


    long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
    for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
    RealConnection connection = i.next();

    // If the connection is in use, keep searching.
    if (pruneAndGetAllocationCount(connection, now) > 0) {
    inUseConnectionCount++;
    continue;
    }

    idleConnectionCount++;

    // If the connection is ready to be evicted, we're done.
    long idleDurationNs = now - connection.idleAtNanos;
    if (idleDurationNs > longestIdleDurationNs) {
    longestIdleDurationNs = idleDurationNs;
    longestIdleConnection = connection;
    }
    }

    if (longestIdleDurationNs >= this.keepAliveDurationNs
    || idleConnectionCount > this.maxIdleConnections) {
    // We've found a connection to evict. Remove it from the list, then close it below (outside
    // of the synchronized block).
    connections.remove(longestIdleConnection);
    } else if (idleConnectionCount > 0) {
    // A connection will be ready to evict soon.
    return keepAliveDurationNs - longestIdleDurationNs;
    } else if (inUseConnectionCount > 0) {
    // All connections are in use. It'll be at least the keep alive duration 'til we run again.
    return keepAliveDurationNs;
    } else {
    // No connections, idle or in use.
    cleanupRunning = false;
    return -1;
    }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
    }

    cleanup方法的过程是 根据连接中的引用计数来计算空闲连接数和活跃连接数,,然后标记出空闲连接数。
    如果空闲连接keepAlive 时间超过5分钟,或者空闲连接数超过5个,则从Deque 中移除次连接,
    如果空闲连接数大于0,则返回此连接即将到期的时间,如果都是活跃连接,并大于0,则返回5分钟。 如果没有任何连接,则返回-1,

    清除算法,使用类似GC中的引用计算算法,如果弱引用StreamAllocation列表为0,则表示空闲需要进行回收。

    可以看出连接池复用的核心就是用 Deque 来存储连接, 通过 put,get 等来对 Deque 进行操作, 另外通过判断连接中的技术对象 StreamAllocation 来进行自动回收连接。

    OkHttp 的优缺点

    优点:

  • 1、支持 HTTP/2,允许连接同一主机的所有请求分享一个 socket。 如果 HTTP/2 不可用,会使用连接池减少请求延迟。
  • 2、使用GZIP压缩下载内容,且压缩操作对用户是透明的。
  • 3、利用响应缓存来避免重复的网络请求。
  • 4、如果你的服务端有多个IP地址,当第一个地址连接失败时,OKHttp会尝试连接其他的地址,这对IPV4和IPV6以及寄宿在多个数据中心的服务而言,是非常有必要的。
  • 5、用户可自主定制拦截器,实现自己想要的网络拦截。
  • 6、支持大文件的上传和下载。
  • 7、支持cookie持久化。
  • 8、支持自签名的https链接,配置有效证书即可。
  • 9、支持Headers的缓存策略减少重复的网络请求。
  • 1、网络请求的回调是子线程,需要用户手动操作发送到主线程。
  • 2、参数较多,配置起来复杂。
  • 所以综合上面的缺点,OkHttpUtils 及类似的 封装应用而生。下一篇我们来通过 OkHttpUtils源码解析 看下是如何封装并解决这些问题的。

  • Android 进阶之光
  • https://blog.piasy.com/2016/07/11/Understand-OkHttp/index.html
  • https://juejin.im/post/5c1b23b9e51d4529096aaaee
  •