歡迎光臨
每天分享高質量文章

【追光者系列】HikariCP原始碼分析之evict、時鐘回撥、連接創建生命周期

摘自【工匠小豬豬的技術世界】 1.這是一個系列,有興趣的朋友可以持續關註 2.如果你有HikariCP使用上的問題,可以給我留言,我們一起溝通討論 3.希望大家可以提供我一些案例,我也希望可以支持你們做一些調優


概念

evict定義在com.zaxxer.hikari.pool.PoolEntry中,evict的漢語意思是驅逐、逐出,用來標記連接池中的連接不可用。

  1. private volatile boolean evict;

  2. boolean isMarkedEvicted() {

  3.      return evict;

  4.   }

  5.   void markEvicted() {

  6.      this.evict = true;

  7.   }

getConnection

在每次getConnection的時候,borrow連接(PoolEntry)的時候,如果是標記evict的,則會關閉連接,更新timeout的值,重新迴圈繼續獲取連接

  1. /**

  2.    * Get a connection from the pool, or timeout after the specified number of milliseconds.

  3.    *

  4.    * @param hardTimeout the maximum time to wait for a connection from the pool

  5.    * @return a java.sql.Connection instance

  6.    * @throws SQLException thrown if a timeout occurs trying to obtain a connection

  7.    */

  8.   public Connection getConnection(final long hardTimeout) throws SQLException {

  9.      suspendResumeLock.acquire();

  10.      final long startTime = currentTime();

  11.      try {

  12.         long timeout = hardTimeout;

  13.         do {

  14.            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);

  15.            if (poolEntry == null) {

  16.               break; // We timed out... break and throw exception

  17.            }

  18.            final long now = currentTime();

  19.            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {

  20.               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);

  21.               timeout = hardTimeout - elapsedMillis(startTime);

  22.            }

  23.            else {

  24.               metricsTracker.recordBorrowStats(poolEntry, startTime);

  25.               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);

  26.            }

  27.         } while (timeout > 0L);

  28.         metricsTracker.recordBorrowTimeoutStats(startTime);

  29.         throw createTimeoutException(startTime);

  30.      }

  31.      catch (InterruptedException e) {

  32.         Thread.currentThread().interrupt();

  33.         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);

  34.      }

  35.      finally {

  36.         suspendResumeLock.release();

  37.      }

  38.   }

如下我們聚焦一下原始碼,hardTimeout預設值是30000,這個值實際上就是connectionTimeout,建構式預設值是SECONDS.toMillis(30) = 30000,預設配置validate之後的值是30000,validate重置以後是如果小於250毫秒,則被重置回30秒。

connectionTimeout  This property controls the maximum number of milliseconds that a client (that’s you) will wait for a connection from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. Lowest acceptable connection timeout is 250 ms. Default: 30000 (30 seconds)

  1. if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {

  2.               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);

  3.               timeout = hardTimeout - elapsedMillis(startTime);

  4.            }

關閉連接這塊的原始碼如下,從註釋可以看到(閱讀hikari原始碼強烈建議看註釋),這是永久關閉真實(底層)連接(吃掉任何異常):

  1. private static final String EVICTED_CONNECTION_MESSAGE = "(connection was evicted)";

  2.   private static final String DEAD_CONNECTION_MESSAGE = "(connection is dead)";

  3.   /**

  4.    * Permanently close the real (underlying) connection (eat any exception).

  5.    *

  6.    * @param poolEntry poolEntry having the connection to close

  7.    * @param closureReason reason to close

  8.    */

  9.   void closeConnection(final PoolEntry poolEntry, final String closureReason) {

  10.      if (connectionBag.remove(poolEntry)) {

  11.         final Connection connection = poolEntry.close();

  12.         closeConnectionExecutor.execute(() -> {

  13.            quietlyCloseConnection(connection, closureReason);

  14.            if (poolState == POOL_NORMAL) {

  15.               fillPool();

  16.            }

  17.         });

  18.      }

  19.   }

吃掉體現在quietlyCloseConnection,這是吃掉Throwable的

  1. // ***********************************************************************

  2.   //                           JDBC methods

  3.   // ***********************************************************************

  4.   void quietlyCloseConnection(final Connection connection, final String closureReason) {

  5.      if (connection != null) {

  6.         try {

  7.            LOGGER.debug("{} - Closing connection {}: {}", poolName, connection, closureReason);

  8.            try {

  9.               setNetworkTimeout(connection, SECONDS.toMillis(15));

  10.            }

  11.            finally {

  12.               connection.close(); // continue with the close even if setNetworkTimeout() throws

  13.            }

  14.         }

  15.         catch (Throwable e) {

  16.            LOGGER.debug("{} - Closing connection {} failed", poolName, connection, e);

  17.         }

  18.      }

  19.   }

createPoolEntry

這段代碼強烈建議看一下註釋,maxLifetime預設是1800000=30分鐘,就是讓每個連接的最大存活時間錯開一點,防止同時過期,加一點點隨機因素,防止一件事情大量同時發生(C大語錄)。

  1. // ***********************************************************************

  2.   //                           Private methods

  3.   // ***********************************************************************

  4.   /**

  5.    * Creating new poolEntry.  If maxLifetime is configured, create a future End-of-life task with 2.5% variance from

  6.    * the maxLifetime time to ensure there is no massive die-off of Connections in the pool.

  7.    */

  8.   private PoolEntry createPoolEntry() {

  9.      try {

  10.         final PoolEntry poolEntry = newPoolEntry();

  11.         final long maxLifetime = config.getMaxLifetime();

  12.         if (maxLifetime > 0) {

  13.            // variance up to 2.5% of the maxlifetime

  14.            final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;

  15.            final long lifetime = maxLifetime - variance;

  16.            poolEntry.setFutureEol(houseKeepingExecutorService.schedule(

  17.               () -> {

  18.                  if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {

  19.                     addBagItem(connectionBag.getWaitingThreadCount());

  20.                  }

  21.               },

  22.               lifetime, MILLISECONDS));

  23.         }

  24.         return poolEntry;

  25.      }

  26.      catch (Exception e) {

  27.         if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently

  28.            LOGGER.debug("{} - Cannot acquire connection from data source", poolName, (e instanceof ConnectionSetupException ? e.getCause() : e));

  29.         }

  30.         return null;

  31.      }

  32.   }

如果maxLifetime大於10000就是大於10秒鐘,就走這個策略,用maxLifetime的2.5%的時間和0之間的隨機數來隨機設定一個variance,在maxLifetime – variance之後觸發evict。 在創建poolEntry的時候,註冊一個延時任務,在連接存活將要到達maxLifetime之前觸發evit,用來防止出現大面積的connection因maxLifetime同一時刻失效。 標記為evict只是表示連接池中的該連接不可用,但還在連接池當中,還會被borrow出來,只是getConnection的時候判斷了,如果是isMarkedEvicted,則會從連接池中移除該連接,然後close掉。

evict Related

evictConnection

可以主動呼叫evictConnection,這裡也是判斷是不是用戶自己呼叫的或者從connectionBag中標記不可borrow成功,則關閉連接

  1. /**

  2.    * Evict a Connection from the pool.

  3.    *

  4.    * @param connection the Connection to evict (actually a {@link ProxyConnection})

  5.    */

  6.   public void evictConnection(Connection connection) {

  7.      ProxyConnection proxyConnection = (ProxyConnection) connection;

  8.      proxyConnection.cancelLeakTask();

  9.      try {

  10.         softEvictConnection(proxyConnection.getPoolEntry(), "(connection evicted by user)", !connection.isClosed() /* owner */);

  11.      }

  12.      catch (SQLException e) {

  13.         // unreachable in HikariCP, but we're still forced to catch it

  14.      }

  15.   }

softEvictConnection

  1. /**

  2.    * "Soft" evict a Connection (/PoolEntry) from the pool.  If this method is being called by the user directly

  3.    * through {@link com.zaxxer.hikari.HikariDataSource#evictConnection(Connection)} then {@code owner} is {@code true}.

  4.    *

  5.    * If the caller is the owner, or if the Connection is idle (i.e. can be "reserved" in the {@link ConcurrentBag}),

  6.    * then we can close the connection immediately.  Otherwise, we leave it "marked" for eviction so that it is evicted

  7.    * the next time someone tries to acquire it from the pool.

  8.    *

  9.    * @param poolEntry the PoolEntry (/Connection) to "soft" evict from the pool

  10.    * @param reason the reason that the connection is being evicted

  11.    * @param owner true if the caller is the owner of the connection, false otherwise

  12.    * @return true if the connection was evicted (closed), false if it was merely marked for eviction

  13.    */

  14.   private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) {

  15.      poolEntry.markEvicted();

  16.      if (owner || connectionBag.reserve(poolEntry)) {

  17.         closeConnection(poolEntry, reason);

  18.         return true;

  19.      }

  20.      return false;

  21.   }

com.zaxxer.hikari.util.ConcurrentBag

  1. /**

  2.    * The method is used to make an item in the bag "unavailable" for

  3.    * borrowing.  It is primarily used when wanting to operate on items

  4.    * returned by the values(int)``` method.  Items that are

  5.    * reserved can be removed from the bag via remove(T)```

  6.    * without the need to unreserve them.  Items that are not removed

  7.    * from the bag can be make available for borrowing again by calling

  8.    * the unreserve(T)``` method.

  9.    *

  10.    * @param bagEntry the item to reserve

  11.    * @return true if the item was able to be reserved, false otherwise

  12.    */

  13.   public boolean reserve(final T bagEntry) {

  14.      return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);

  15.   }

softEvictConnections

HikariPool中還提供了HikariPoolMXBean的softEvictConnections實現,實際上是呼叫softEvictConnection,owner指定false( not owner )

  1. public void softEvictConnections() {

  2.      connectionBag.values().forEach(poolEntry -> softEvictConnection(poolEntry, "(connection evicted)", false /* not owner */));

  3.   }

Mbean的softEvictConnections方法真正執行的是com.zaxxer.hikari.pool.HikariPool中softEvictConnections方法,這是一種“軟”驅逐池中連接的方法,如果呼叫方是owner身份,或者連接處於空閑狀態,可以立即關閉連接。否則,我們將其“標記”為驅逐,以便下次有人試圖從池中獲取它時將其逐出。

  1. public void softEvictConnections() {

  2.      connectionBag.values().forEach(poolEntry -> softEvictConnection(poolEntry, "(connection evicted)", false /* not owner */));

  3.   }

softEvictConnection

  1. /**

  2.    * "Soft" evict a Connection (/PoolEntry) from the pool.  If this method is being called by the user directly

  3.    * through {@link com.zaxxer.hikari.HikariDataSource#evictConnection(Connection)} then {@code owner} is {@code true}.

  4.    *

  5.    * If the caller is the owner, or if the Connection is idle (i.e. can be "reserved" in the {@link ConcurrentBag}),

  6.    * then we can close the connection immediately.  Otherwise, we leave it "marked" for eviction so that it is evicted

  7.    * the next time someone tries to acquire it from the pool.

  8.    *

  9.    * @param poolEntry the PoolEntry (/Connection) to "soft" evict from the pool

  10.    * @param reason the reason that the connection is being evicted

  11.    * @param owner true if the caller is the owner of the connection, false otherwise

  12.    * @return true if the connection was evicted (closed), false if it was merely marked for eviction

  13.    */

  14.   private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) {

  15.      poolEntry.markEvicted();

  16.      if (owner || connectionBag.reserve(poolEntry)) {

  17.         closeConnection(poolEntry, reason);

  18.         return true;

  19.      }

  20.      return false;

  21.   }

執行此方法時我們的owner預設傳false(not owner),呼叫com.zaxxer.hikari.util.ConcurrentBag的reserve對方進行保留

  1. /**

  2.    * The method is used to make an item in the bag "unavailable" for

  3.    * borrowing.  It is primarily used when wanting to operate on items

  4.    * returned by the values(int)``` method.  Items that are

  5.    * reserved can be removed from the bag via remove(T)```

  6.    * without the need to unreserve them.  Items that are not removed

  7.    * from the bag can be make available for borrowing again by calling

  8.    * the unreserve(T)``` method.

  9.    *

  10.    * @param bagEntry the item to reserve

  11.    * @return true if the item was able to be reserved, false otherwise

  12.    */

  13.   public boolean reserve(final T bagEntry) {

  14.      return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);

  15.   }

除了 HikariPoolMXBean的呼叫,softEvictConnections在housekeeper中也有使用

  1. /**

  2.    * The house keeping task to retire and maintain minimum idle connections.

  3.    */

  4.   private final class HouseKeeper implements Runnable {

  5.      private volatile long previous = plusMillis(currentTime(), -HOUSEKEEPING_PERIOD_MS);

  6.      @Override

  7.      public void run()

  8.      {

  9.         try {

  10.            // refresh timeouts in case they changed via MBean

  11.            connectionTimeout = config.getConnectionTimeout();

  12.            validationTimeout = config.getValidationTimeout();

  13.            leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());

  14.            final long idleTimeout = config.getIdleTimeout();

  15.            final long now = currentTime();

  16.            // Detect retrograde time, allowing +128ms as per NTP spec.

  17.            if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {

  18.               LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",

  19.                           poolName, elapsedDisplayString(previous, now));

  20.               previous = now;

  21.               softEvictConnections();

  22.               return;

  23.            }

  24.            else if (now > plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) {

  25.               // No point evicting for forward clock motion, this merely accelerates connection retirement anyway

  26.               LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));

  27.            }

  28.            previous = now;

  29.            String afterPrefix = "Pool ";

  30.            if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {

  31.               logPoolState("Before cleanup ");

  32.               afterPrefix = "After cleanup  ";

  33.               final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);

  34.               int toRemove = notInUse.size() - config.getMinimumIdle();

  35.               for (PoolEntry entry : notInUse) {

  36.                  if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {

  37.                     closeConnection(entry, "(connection has passed idleTimeout)");

  38.                     toRemove--;

  39.                  }

  40.               }

  41.            }

  42.            logPoolState(afterPrefix);

  43.            fillPool(); // Try to maintain minimum connections

  44.         }

  45.         catch (Exception e) {

  46.            LOGGER.error("Unexpected exception in housekeeping task", e);

  47.         }

  48.      }

  49.   }

聚焦一下,這段代碼也是檢測時鐘回撥,如果時鐘在規定範圍外回撥了,就驅除連接,並重置時間。

  1. // Detect retrograde time, allowing +128ms as per NTP spec.

  2.            if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {

  3.               LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",

  4.                           poolName, elapsedDisplayString(previous, now));

  5.               previous = now;

  6.               softEvictConnections();

  7.               return;

  8.            }

  9.  /**

  10.    * Return the specified opaque time-stamp plus the specified number of milliseconds.

  11.    *

  12.    * @param time an opaque time-stamp

  13.    * @param millis milliseconds to add

  14.    * @return a new opaque time-stamp

  15.    */

  16.   static long plusMillis(long time, long millis) {

  17.      return CLOCK.plusMillis0(time, millis);

  18.   }

說到時鐘回撥,是不是想起了snowflake里的時鐘回撥的處理?讓我們一起溫習一下!

  1. /**

  2. * 自生成Id生成器.

  3. *

  4. *

  5. * 長度為64bit,從高位到低位依次為

  6. *

  • *

  • *

  • * 1bit   符號位

  • * 41bits 時間偏移量從2016年11月1日零點到現在的毫秒數

  • * 10bits 工作行程Id

  • * 12bits 同一個毫秒內的自增量

  • *

  • *

  • *

  • * 工作行程Id獲取優先級: 系統變數{@code sjdbc.self.id.generator.worker.id} 大於 環境變數{@code SJDBC_SELF_ID_GENERATOR_WORKER_ID}

  • * ,另外可以呼叫@{@code CommonSelfIdGenerator.setWorkerId}進行設置

  • *

  • *

  • * @author gaohongtao

  • */

  • @Getter

  • @Slf4j

  • public class CommonSelfIdGenerator implements IdGenerator {

  •    public static final long SJDBC_EPOCH;//時間偏移量,從2016年11月1日零點開始

  •    private static final long SEQUENCE_BITS = 12L;//自增量占用比特

  •    private static final long WORKER_ID_BITS = 10L;//工作行程ID比特

  •    private static final long SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;//自增量掩碼(最大值)

  •    private static final long WORKER_ID_LEFT_SHIFT_BITS = SEQUENCE_BITS;//工作行程ID左移比特數(位數)

  •    private static final long TIMESTAMP_LEFT_SHIFT_BITS = WORKER_ID_LEFT_SHIFT_BITS + WORKER_ID_BITS;//時間戳左移比特數(位數)

  •    private static final long WORKER_ID_MAX_VALUE = 1L << WORKER_ID_BITS;//工作行程ID最大值

  •    @Setter

  •    private static AbstractClock clock = AbstractClock.systemClock();

  •    @Getter

  •    private static long workerId;//工作行程ID

  •    static {

  •        Calendar calendar = Calendar.getInstance();

  •        calendar.set(2016, Calendar.NOVEMBER, 1);

  •        calendar.set(Calendar.HOUR_OF_DAY, 0);

  •        calendar.set(Calendar.MINUTE, 0);

  •        calendar.set(Calendar.SECOND, 0);

  •        calendar.set(Calendar.MILLISECOND, 0);

  •        SJDBC_EPOCH = calendar.getTimeInMillis();

  •        initWorkerId();

  •    }

  •    private long sequence;//最後自增量

  •    private long lastTime;//最後生成編號時間戳,單位:毫秒

  •    static void initWorkerId() {

  •        String workerId = System.getProperty("sjdbc.self.id.generator.worker.id");

  •        if (!Strings.isNullOrEmpty(workerId)) {

  •            setWorkerId(Long.valueOf(workerId));

  •            return;

  •        }

  •        workerId = System.getenv("SJDBC_SELF_ID_GENERATOR_WORKER_ID");

  •        if (Strings.isNullOrEmpty(workerId)) {

  •            return;

  •        }

  •        setWorkerId(Long.valueOf(workerId));

  •    }

  •    /**

  •     * 設置工作行程Id.

  •     *

  •     * @param workerId 工作行程Id

  •     */

  •    public static void setWorkerId(final Long workerId) {

  •        Preconditions.checkArgument(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE);

  •        CommonSelfIdGenerator.workerId = workerId;

  •    }

  •    /**

  •     * 生成Id.

  •     *

  •     * @return 傳回@{@link Long}型別的Id

  •     */

  •    @Override

  •    public synchronized Number generateId() {

  •    //保證當前時間大於最後時間。時間回退會導致產生重覆id

  •        long time = clock.millis();

  •        Preconditions.checkState(lastTime <= time, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, time);

  •        // 獲取序列號

  •        if (lastTime == time) {

  •            if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {

  •                time = waitUntilNextTime(time);

  •            }

  •        } else {

  •            sequence = 0;

  •        }

  •        // 設置最後時間戳

  •        lastTime = time;

  •        if (log.isDebugEnabled()) {

  •            log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);

  •        }

  •        // 生成編號

  •        return ((time - SJDBC_EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;

  •    }

  •    //不停獲得時間,直到大於最後時間

  •    private long waitUntilNextTime(final long lastTime) {

  •        long time = clock.millis();

  •        while (time <= lastTime) {

  •            time = clock.millis();

  •        }

  •        return time;

  •    }

  • }


  • 通過這段代碼可以看到噹噹的時鐘回撥在單機上是做了處理的了,不但會丟擲Clock is moving backwards balabalabala的IllegalStateException,而且也做了waitUntilNextTime一直等待的處理

    除了housekeeper,在shutdown中也做了處理

    1.  /**

    2.    * Shutdown the pool, closing all idle connections and aborting or closing

    3.    * active connections.

    4.    *

    5.    * @throws InterruptedException thrown if the thread is interrupted during shutdown

    6.    */

    7.   public synchronized void shutdown() throws InterruptedException {

    8.      try {

    9.         poolState = POOL_SHUTDOWN;

    10.         if (addConnectionExecutor == null) { // pool never started

    11.            return;

    12.         }

    13.         logPoolState("Before shutdown ");

    14.         if (houseKeeperTask != null) {

    15.            houseKeeperTask.cancel(false);

    16.            houseKeeperTask = null;

    17.         }

    18.         softEvictConnections();

    19.         addConnectionExecutor.shutdown();

    20.         addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS);

    21.         destroyHouseKeepingExecutorService();

    22.         connectionBag.close();

    23.         final ExecutorService assassinExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection assassinator",

    24.                                                                           config.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

    25.         try {

    26.            final long start = currentTime();

    27.            do {

    28.               abortActiveConnections(assassinExecutor);

    29.               softEvictConnections();

    30.            } while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10));

    31.         }

    32.         finally {

    33.            assassinExecutor.shutdown();

    34.            assassinExecutor.awaitTermination(10L, SECONDS);

    35.         }

    36.         shutdownNetworkTimeoutExecutor();

    37.         closeConnectionExecutor.shutdown();

    38.         closeConnectionExecutor.awaitTermination(10L, SECONDS);

    39.      }

    40.      finally {

    41.         logPoolState("After shutdown ");

    42.         unregisterMBeans();

    43.         metricsTracker.close();

    44.      }

    45.   }

    ConcurrentBag

    說到ConcurrentBag這個不得不提的類,我這裡取用一下文章做一下簡要介紹,本系列後面會專題系統分析: http://www.cnblogs.com/taisenki/p/7699667.html HikariCP連接池是基於自主實現的ConcurrentBag完成的資料連接的多執行緒共享交互,是HikariCP連接管理快速的其中一個關鍵點。 ConcurrentBag是一個專門的併發包裹,在連接池(多執行緒資料交互)的實現上具有比LinkedBlockingQueue和LinkedTransferQueue更優越的性能。 ConcurrentBag通過拆分 CopyOnWriteArrayList、ThreadLocal和SynchronousQueue 進行併發資料交互。

    • CopyOnWriteArrayList:負責存放ConcurrentBag中全部用於出借的資源

    • ThreadLocal:用於加速執行緒本地化資源訪問

    • SynchronousQueue:用於存在資源等待執行緒時的第一手資源交接

    ConcurrentBag中全部的資源均只能通過add方法進行添加,只能通過remove方法進行移出。

    1. public void add(final T bagEntry) {

    2.   if (closed) {

    3.      LOGGER.info("ConcurrentBag has been closed, ignoring add()");

    4.      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");

    5.   }

    6.   sharedList.add(bagEntry); //新添加的資源優先放入CopyOnWriteArrayList

    7.   // 當有等待資源的執行緒時,將資源交到某個等待執行緒後才傳回(SynchronousQueue)

    8.   while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {

    9.      yield();

    10.   }

    11. }

    12. public boolean remove(final T bagEntry) {

    13.   // 如果資源正在使用且無法進行狀態切換,則傳回失敗

    14.   if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {

    15.      LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);

    16.      return false;

    17.   }

    18.   final boolean removed = sharedList.remove(bagEntry); // 從CopyOnWriteArrayList中移出

    19.   if (!removed && !closed) {

    20.      LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);

    21.   }

    22.   return removed;

    23. }

    ConcurrentBag中通過borrow方法進行資料資源借用,通過requite方法進行資源回收,註意其中borrow方法只提供物件取用,不移除物件,因此使用時通過borrow取出的物件必須通過requite方法進行放回,否則容易導致記憶體泄露!

    1. public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {

    2.   // 優先查看有沒有可用的本地化的資源

    3.   final List<Object> list = threadList.get();

    4.   for (int i = list.size() - 1; i >= 0; i--) {

    5.      final Object entry = list.remove(i);

    6.      @SuppressWarnings("unchecked")

    7.      final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;

    8.      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {

    9.         return bagEntry;

    10.      }

    11.   }

    12.   final int waiting = waiters.incrementAndGet();

    13.   try {

    14.      // 當無可用本地化資源時,遍歷全部資源,查看是否存在可用資源

    15.      // 因此被一個執行緒本地化的資源也可能被另一個執行緒“搶走”

    16.      for (T bagEntry : sharedList) {

    17.         if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {

    18.            if (waiting > 1) {

    19.                // 因為可能“搶走”了其他執行緒的資源,因此提醒包裹進行資源添加

    20.               listener.addBagItem(waiting - 1);

    21.            }

    22.            return bagEntry;

    23.         }

    24.      }

    25.      listener.addBagItem(waiting);

    26.      timeout = timeUnit.toNanos(timeout);

    27.      do {

    28.         final long start = currentTime();

    29.         // 當現有全部資源全部在使用中,等待一個被釋放的資源或者一個新資源

    30.         final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);

    31.         if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {

    32.            return bagEntry;

    33.         }

    34.         timeout -= elapsedNanos(start);

    35.      } while (timeout > 10_000);

    36.      return null;

    37.   }

    38.   finally {

    39.      waiters.decrementAndGet();

    40.   }

    41. }

    42. public void requite(final T bagEntry) {

    43.   // 將狀態轉為未在使用

    44.   bagEntry.setState(STATE_NOT_IN_USE);

    45.   // 判斷是否存在等待執行緒,若存在,則直接轉手資源

    46.   for (int i = 0; waiters.get() > 0; i++) {

    47.      if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {

    48.         return;

    49.      }

    50.      else if ((i & 0xff) == 0xff) {

    51.         parkNanos(MICROSECONDS.toNanos(10));

    52.      }

    53.      else {

    54.         yield();

    55.      }

    56.   }

    57.   // 否則,進行資源本地化

    58.   final List<Object> threadLocalList = threadList.get();

    59.   threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);

    60. }

    上述代碼中的 weakThreadLocals 是用來判斷是否使用弱取用,通過下述方法初始化:

    1. private boolean useWeakThreadLocals()

    2. {

    3.   try {

    4.      // 人工指定是否使用弱取用,但是官方不推薦進行自主設置。

    5.      if (System.getProperty("com.dareway.concurrent.useWeakReferences") != null) {

    6.         return Boolean.getBoolean("com.dareway.concurrent.useWeakReferences");

    7.      }

    8.      // 預設通過判斷初始化的ClassLoader是否是系統的ClassLoader來確定

    9.      return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();

    10.   }

    11.   catch (SecurityException se) {

    12.      return true;

    13.   }

    14. }

    Hikari物理連接取用生命周期

    上面提到了很多概念,比如HikariDataSource、HikariPool、ConcurrentBag、ProxyFactory、PoolEntry等等,那麼這裡的關係是什麼呢?

    這裡推薦一下這篇文章 http://www.cnblogs.com/taisenki/p/7717912.html ,我取用一下部分內容:

    HikariCP中的連接取用流程如下:

    HikariPool負責對資源連接進行管理,而ConcurrentBag則是作為物理連接的共享資源站,PoolEntry則是對物理連接的1-1封裝。

    PoolEntry通過connectionBag的borrow方法從bag中取出,,之後通過PoolEntry.createProxyConnection呼叫工廠類生成HikariProxyConnection傳回。

    1. /**

    2. * Entry used in the ConcurrentBag to track Connection instances.

    3. *

    4. * @author Brett Wooldridge

    5. */

    6. final class PoolEntry implements IConcurrentBagEntry {

    7.   private static final Logger LOGGER = LoggerFactory.getLogger(PoolEntry.class);

    8.   private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater;

    9.   Connection connection;

    10.   long lastAccessed;

    11.   long lastBorrowed;

    12.   @SuppressWarnings("FieldCanBeLocal")

    13.   private volatile int state = 0;

    14.   private volatile boolean evict;

    15.   private volatile ScheduledFuture> endOfLife;

    16.   private final FastList<Statement> openStatements;

    17.   private final HikariPool hikariPool;

    18.   private final boolean isReadOnly;

    19.   private final boolean isAutoCommit;

    20.   static

    21.   {

    22.      stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PoolEntry.class, "state");

    23.   }

    24.   PoolEntry(final Connection connection, final PoolBase pool, final boolean isReadOnly, final boolean isAutoCommit)

    25.   {

    26.      this.connection = connection;

    27.      this.hikariPool = (HikariPool) pool;

    28.      this.isReadOnly = isReadOnly;

    29.      this.isAutoCommit = isAutoCommit;

    30.      this.lastAccessed = currentTime();

    31.      this.openStatements = new FastList<>(Statement.class, 16);

    32.   }

    33.   /**

    34.    * Release this entry back to the pool.

    35.    *

    36.    * @param lastAccessed last access time-stamp

    37.    */

    38.   void recycle(final long lastAccessed) {

    39.      if (connection != null) {

    40.         this.lastAccessed = lastAccessed;

    41.         hikariPool.recycle(this);

    42.      }

    43.   }

    44.   /**

    45.    * Set the end of life {@link ScheduledFuture}.

    46.    *

    47.    * @param endOfLife this PoolEntry/Connection's end of life {@link ScheduledFuture}

    48.    */

    49.   void setFutureEol(final ScheduledFuture> endOfLife) {

    50.      this.endOfLife = endOfLife;

    51.   }

    52.   Connection createProxyConnection(final ProxyLeakTask leakTask, final long now) {

    53.      return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);

    54.   }

    55.   void resetConnectionState(final ProxyConnection proxyConnection, final int dirtyBits) throws SQLException {

    56.      hikariPool.resetConnectionState(connection, proxyConnection, dirtyBits);

    57.   }

    58.   String getPoolName() {

    59.      return hikariPool.toString();

    60.   }

    61.   boolean isMarkedEvicted() {

    62.      return evict;

    63.   }

    64.   void markEvicted() {

    65.      this.evict = true;

    66.   }

    67.   void evict(final String closureReason) {

    68.      hikariPool.closeConnection(this, closureReason);

    69.   }

    70.   /** Returns millis since lastBorrowed */

    71.   long getMillisSinceBorrowed() {

    72.      return elapsedMillis(lastBorrowed);

    73.   }

    74.   /** {@inheritDoc} */

    75.   @Override

    76.   public String toString() {

    77.      final long now = currentTime();

    78.      return connection

    79.         + ", accessed " + elapsedDisplayString(lastAccessed, now) + " ago, "

    80.         + stateToString();

    81.   }

    82.   // ***********************************************************************

    83.   //                      IConcurrentBagEntry methods

    84.   // ***********************************************************************

    85.   /** {@inheritDoc} */

    86.   @Override

    87.   public int getState() {

    88.      return stateUpdater.get(this);

    89.   }

    90.   /** {@inheritDoc} */

    91.   @Override

    92.   public boolean compareAndSet(int expect, int update) {

    93.      return stateUpdater.compareAndSet(this, expect, update);

    94.   }

    95.   /** {@inheritDoc} */

    96.   @Override

    97.   public void setState(int update) {

    98.      stateUpdater.set(this, update);

    99.   }

    100.   Connection close() {

    101.      ScheduledFuture> eol = endOfLife;

    102.      if (eol != null && !eol.isDone() && !eol.cancel(false)) {

    103.         LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);

    104.      }

    105.      Connection con = connection;

    106.      connection = null;

    107.      endOfLife = null;

    108.      return con;

    109.   }

    110.   private String stateToString() {

    111.      switch (state) {

    112.      case STATE_IN_USE:

    113.         return "IN_USE";

    114.      case STATE_NOT_IN_USE:

    115.         return "NOT_IN_USE";

    116.      case STATE_REMOVED:

    117.         return "REMOVED";

    118.      case STATE_RESERVED:

    119.         return "RESERVED";

    120.      default:

    121.         return "Invalid";

    122.      }

    123.   }

    124. }

    參考資料

    • https://segmentfault.com/a/1190000013118843

    • http://www.cnblogs.com/taisenki/p/7717912.html

    END

    赞(0)

    分享創造快樂