歡迎光臨
每天分享高質量文章

【追光者系列】HikariCP 原始碼分析之 allowPoolSuspension

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

摘要: 原創出處 https://mp.weixin.qq.com/s/-WGg22lUQU41c_8lx6kyQA 「渣渣王子」歡迎轉載,保留摘要,謝謝!

  • 概念

  • 用途及實戰思考

  • 原始碼解析

  • suspendPool

  • Construct for isAllowPoolSuspension

  • SuspendResumeLock

  • getConnection

  • resumePool

  • softEvictConnections

  • ConcurrentBag

  • 參考資料


概念

該屬性控制池是否可以通過JMX暫停和恢復。這對於某些故障轉移自動化方案很有用。當池被暫停時,呼叫 getConnection()將不會超時,並將一直保持到池恢復為止。 預設值:false。

allowPoolSuspension
This property controls whether the pool can be suspended and resumed through JMX. This is useful for certain failover automation scenarios. When the pool is suspended, calls to getConnection() will not timeout and will be held until the pool is resumed. Default: false

這裡要特別說明一下,必須開啟 allowPoolSuspension: true 且在 registerMbeans: true的情況下才能通過MBean Proxy調節softEvictConnections()和suspendPool()/resumePool() methods.

使用方式如下:

MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName poolName = new ObjectName("com.zaxxer.hikari:type=Pool (foo)");
HikariPoolMXBean poolProxy = JMX.newMXBeanProxy(mBeanServer, poolName, HikariPoolMXBean.class);
int idleConnections = poolProxy.getIdleConnections();
poolProxy.suspendPool();
poolProxy.softEvictConnections();
poolProxy.resumePool();

用途及實戰思考

作者是這麼說的:
https://github.com/brettwooldridge/HikariCP/issues/1060

All of the suspend use cases I have heard have centered around a pattern of:

  • Suspend the pool.

  • Alter the pool configuration, or alter DNS configuration (to point to a new master).

  • Soft-evict existing connections.

  • Resume the pool.

我做過試驗,Suspend期間getConnection確實不會超時,SQL執行都會被保留下來,軟驅除現有連接之後,一直保持到池恢復Resume時,這些SQL依然會繼續執行,也就是說用戶並不會丟資料。
但是在實際生產中,不影響業務很難,即使繼續執行,業務也可能超時了。
故障註入是中間件開發應該要做的,這個點的功能在實現chaosmonkey以模擬資料庫連接故障,但是監控過程中我發現hikaricp_pending_threads指標並沒有提升、MBean的threadAwaitingConnections也沒有改變,所以包括故障演練以後也可以不用搞得那麼複雜,收攏在中間件內部做可能更好,前提是對於這個引數,中間件還需要自研以增加模擬拋異常或是一些監控指標進行加強。
另外,長期阻塞該引數存在讓微服務卡死的風險

原始碼解析

本文基於hikariCP 2.7.3的原始碼進行分析

suspendPool

首先我們觀察com.zaxxer.hikari.pool.HikariPool#suspendPool方法,

   @Override
   public synchronized void suspendPool()
   
{
      if (suspendResumeLock == SuspendResumeLock.FAUX_LOCK) {
         throw new IllegalStateException(poolName + " - is not suspendable");
      }
      else if (poolState != POOL_SUSPENDED) {
         suspendResumeLock.suspend();
         poolState = POOL_SUSPENDED;
      }
   }

如果suspendResumeLock是FAUX_LOCK的話,就直接拋異常;否則,如果當前連接池狀態並不是POOL_SUSPENDED(1)狀態——還有POOL_NORMAL(0)及POOL_SHUTDOWN(2)狀態,呼叫java.util.concurrent.Semaphore.SuspendResumeLock的suspend方法,從此信號量獲取給定數目10000的許可,在提供這些許可前一直將執行緒阻塞。

private static final int MAX_PERMITS = 10000;
public void suspend()
   
{
      acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);
   }

Construct for isAllowPoolSuspension

我前文提及的為什麼必須開啟allowPoolSuspension: true且在 registerMbeans: true的情況下才能通過MBean Proxy調節softEvictConnections()和suspendPool()/resumePool() methods,我之前的大綱文章【追光者系列】HikariCP預設配置也有提及,現在我帶大家從原始碼角度看一下:
我們看一下com.zaxxer.hikari.pool.HikariPool的建構式

/**
    * Construct a HikariPool with the specified configuration.
    *
    * @param config a HikariConfig instance
    */

   public HikariPool(final HikariConfig config)
   
{
      super(config);
      this.connectionBag = new ConcurrentBag<>(this);
      this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
      this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
      checkFailFast();
      if (config.getMetricsTrackerFactory() != null) {
         setMetricsTrackerFactory(config.getMetricsTrackerFactory());
      }
      else {
         setMetricRegistry(config.getMetricRegistry());
      }
      setHealthCheckRegistry(config.getHealthCheckRegistry());
      registerMBeans(this);
      ThreadFactory threadFactory = config.getThreadFactory();
      LinkedBlockingQueue addConnectionQueue = new LinkedBlockingQueue<>(config.getMaximumPoolSize());
      this.addConnectionQueue = unmodifiableCollection(addConnectionQueue);
      this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
      this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
      this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
      this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS);
   }

在這裡我們可以看到

this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

isAllowPoolSuspension預設值是false的,建構式直接會創建SuspendResumeLock.FAUX_LOCK;只有isAllowPoolSuspension為true時,才會真正創建SuspendResumeLock。

SuspendResumeLock

com.zaxxer.hikari.util.SuspendResumeLock內部實現了一虛一實兩個java.util.concurrent.Semaphore

/**
 * This class implements a lock that can be used to suspend and resume the pool.  It
 * also provides a faux implementation that is used when the feature is disabled that
 * hopefully gets fully "optimized away" by the JIT.
 *
 * @author Brett Wooldridge
 */

public class SuspendResumeLock
{
   public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {
      @Override
      public void acquire() {}
      @Override
      public void release() {}
      @Override
      public void suspend() {}
      @Override
      public void resume() {}
   };
   private static final int MAX_PERMITS = 10000;
   private final Semaphore acquisitionSemaphore;
   /**
    * Default constructor
    */

   public SuspendResumeLock()
   
{
      this(true);
   }
   private SuspendResumeLock(final boolean createSemaphore)
   
{
      acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);
   }
   public void acquire()
   
{
      acquisitionSemaphore.acquireUninterruptibly();
   }
   public void release()
   
{
      acquisitionSemaphore.release();
   }
   public void suspend()
   
{
      acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);
   }
   public void resume()
   
{
      acquisitionSemaphore.release(MAX_PERMITS);
   }
}

由於Hikari的isAllowPoolSuspension預設值是false的,FAUX_LOCK只是一個空方法,acquisitionSemaphore物件也是空的;如果isAllowPoolSuspension值調整為true,當收到MBean的suspend呼叫時將會一次性acquisitionSemaphore.acquireUninterruptibly從此信號量獲取給定數目MAX_PERMITS 10000的許可,在提供這些許可前一直將執行緒阻塞。之後HikariPool的getConnection方法獲取不到連接,阻塞在suspendResumeLock.acquire(),除非resume方法釋放給定數目MAX_PERMITS 10000的許可,將其傳回到信號量。

getConnection

我們看一下com.zaxxer.hikari.pool.HikariPool的getConnection核心方法

 /**
    * Get a connection from the pool, or timeout after connectionTimeout milliseconds.
    *
    * @return a java.sql.Connection instance
    * @throws SQLException thrown if a timeout occurs trying to obtain a connection
    */

   public Connection getConnection() throws SQLException
   
{
      return getConnection(connectionTimeout);
   }
/**
    * Get a connection from the pool, or timeout after the specified number of milliseconds.
    *
    * @param hardTimeout the maximum time to wait for a connection from the pool
    * @return a java.sql.Connection instance
    * @throws SQLException thrown if a timeout occurs trying to obtain a connection
    */

   public Connection getConnection(final long hardTimeout) throws SQLException
   
{
      suspendResumeLock.acquire();
      final long startTime = currentTime();
      try {
         long timeout = hardTimeout;
         do {
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break// We timed out... break and throw exception
            }
            final long now = currentTime();
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);
         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }

我們可以看到在getConnection的方法最前面和finally最後的時候分別進行了suspendResumeLock.acquire()和suspendResumeLock.release的操作,hardTimeout就是connectionTimeout,預設值SECONDS.toMillis(30) = 30000(如果小於250毫秒,則被重置回30秒),代表the maximum time to wait for a connection from the pool(等待來自池的連接的最大毫秒數,補充一下,在acquire之後如果在沒有可用連接的情況下超過此時間,則會丟擲SQLException)。
suspendPool之後的每次getConnection方法,其實都會卡在上面代碼第一行suspendResumeLock.acquire()中在SuspendResumeLock的具體實現

   public void acquire()
   
{
      acquisitionSemaphore.acquireUninterruptibly();
   }

resumePool

resumePool只針對當前是POOL_SUSPENDED狀態的連接池置為POOL_NORMAL,然後fillPool,最終resume實際呼叫SuspendResumeLock的acquisitionSemaphore.release(MAX_PERMITS)方法釋放給定數目MAX_PERMITS 10000的許可,將其傳回到信號量。

  @Override
   public synchronized void resumePool()
   
{
      if (poolState == POOL_SUSPENDED) {
         poolState = POOL_NORMAL;
         fillPool();
         suspendResumeLock.resume();
      }
   }

fillPool

從當前的空閑連接(在執行時被感知到的)填充到minimumIdle(HikariCP嘗試在池中維護的最小空閑連接數,如果空閑連接低於此值並且池中的總連接數少於maximumPoolSize,HikariCP將盡最大努力快速高效地添加其他連接)。

   /**
    * Fill pool up from current idle connections (as they are perceived at the point of execution) to minimumIdle connections.
    */

   private synchronized void fillPool()
   
{
      final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
                                   - addConnectionQueue.size();
      for (int i = 0; i          addConnectionExecutor.submit((i 1) ? POOL_ENTRY_CREATOR : POST_FILL_POOL_ENTRY_CREATOR);
      }
   }

com.zaxxer.hikari.util.SuspendResumeLock#resume

   public void resume()
   
{
      acquisitionSemaphore.release(MAX_PERMITS);
   }

softEvictConnections

Mbean的softEvictConnections方法真正執行的是com.zaxxer.hikari.pool.HikariPool中softEvictConnections方法,這是一種“軟”驅逐池中連接的方法,如果呼叫方是owner身份,或者連接處於空閑狀態,可以立即關閉連接。否則,我們將其“標記”為驅逐,以便下次有人試圖從池中獲取它時將其逐出。

public void softEvictConnections()
   
{
      connectionBag.values().forEach(poolEntry -> softEvictConnection(poolEntry, "(connection evicted)"false /* not owner */));
   }
/**
    * "Soft" evict a Connection (/PoolEntry) from the pool.  If this method is being called by the user directly
    * through {@link com.zaxxer.hikari.HikariDataSource#evictConnection(Connection)} then {@code owner} is {@code true}.
    *
    * If the caller is the owner, or if the Connection is idle (i.e. can be "reserved" in the {@link ConcurrentBag}),
    * then we can close the connection immediately.  Otherwise, we leave it "marked" for eviction so that it is evicted
    * the next time someone tries to acquire it from the pool.
    *
    * @param poolEntry the PoolEntry (/Connection) to "soft" evict from the pool
    * @param reason the reason that the connection is being evicted
    * @param owner true if the caller is the owner of the connection, false otherwise
    * @return true if the connection was evicted (closed), false if it was merely marked for eviction
    */

   private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner)
   
{
      poolEntry.markEvicted();
      if (owner || connectionBag.reserve(poolEntry)) {
         closeConnection(poolEntry, reason);
         return true;
      }
      return false;
   }

執行此方法時我們的owner預設傳false(not owner),呼叫com.zaxxer.hikari.util.ConcurrentBag的reserve對方進行保留

 /**
    * The method is used to make an item in the bag "unavailable" for
    * borrowing.  It is primarily used when wanting to operate on items
    * returned by the values(int) method.  Items that are
    * reserved can be removed from the bag via remove(T)
    * without the need to unreserve them.  Items that are not removed
    * from the bag can be make available for borrowing again by calling
    * the unreserve(T) method.
    *
    * @param bagEntry the item to reserve
    * @return true if the item was able to be reserved, false otherwise
    */

   public boolean reserve(final T bagEntry)
   
{
      return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
   }

ConcurrentBag

說到ConcurrentBag這個不得不提的類,我這裡取用一下文章做一下簡要介紹,本系列後面會專題系統分析:
http://www.cnblogs.com/taisenki/p/7699667.html
HikariCP連接池是基於自主實現的ConcurrentBag完成的資料連接的多執行緒共享交互,是HikariCP連接管理快速的其中一個關鍵點。
ConcurrentBag是一個專門的併發包裹,在連接池(多執行緒資料交互)的實現上具有比LinkedBlockingQueue和LinkedTransferQueue更優越的性能。
ConcurrentBag通過拆分 CopyOnWriteArrayList、ThreadLocal和SynchronousQueue
進行併發資料交互。

  • CopyOnWriteArrayList:負責存放ConcurrentBag中全部用於出借的資源

  • ThreadLocal:用於加速執行緒本地化資源訪問

  • SynchronousQueue:用於存在資源等待執行緒時的第一手資源交接

ConcurrentBag中全部的資源均只能通過add方法進行添加,只能通過remove方法進行移出。

public void add(final T bagEntry)
{
   if (closed) {
      LOGGER.info("ConcurrentBag has been closed, ignoring add()");
      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
   }
   sharedList.add(bagEntry); //新添加的資源優先放入CopyOnWriteArrayList
   // 當有等待資源的執行緒時,將資源交到某個等待執行緒後才傳回(SynchronousQueue)
   while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
      yield();
   }
}
public boolean remove(final T bagEntry)
{
   // 如果資源正在使用且無法進行狀態切換,則傳回失敗
   if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
      return false;
   }
   final boolean removed = sharedList.remove(bagEntry); // 從CopyOnWriteArrayList中移出
   if (!removed && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
   }
   return removed;
}

ConcurrentBag中通過borrow方法進行資料資源借用,通過requite方法進行資源回收,註意其中borrow方法只提供物件取用,不移除物件,因此使用時通過borrow取出的物件必須通過requite方法進行放回,否則容易導致記憶體泄露!

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
   // 優先查看有沒有可用的本地化的資源
   final List list = threadList.get();
   for (int i = list.size() - 1; i >= 0; i--) {
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked")
      final T bagEntry = weakThreadLocals ? ((WeakReference) entry).get() : (T) entry;
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }
   final int waiting = waiters.incrementAndGet();
   try {
      // 當無可用本地化資源時,遍歷全部資源,查看是否存在可用資源
      // 因此被一個執行緒本地化的資源也可能被另一個執行緒“搶走”
      for (T bagEntry : sharedList) {
         if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            if (waiting > 1) {
                // 因為可能“搶走”了其他執行緒的資源,因此提醒包裹進行資源添加
               listener.addBagItem(waiting - 1);
            }
            return bagEntry;
         }
      }
      listener.addBagItem(waiting);
      timeout = timeUnit.toNanos(timeout);
      do {
         final long start = currentTime();
         // 當現有全部資源全部在使用中,等待一個被釋放的資源或者一個新資源
         final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
         if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
         timeout -= elapsedNanos(start);
      } while (timeout > 10_000);
      return null;
   }
   finally {
      waiters.decrementAndGet();
   }
}
public void requite(final T bagEntry)
{
   // 將狀態轉為未在使用
   bagEntry.setState(STATE_NOT_IN_USE);
   // 判斷是否存在等待執行緒,若存在,則直接轉手資源
   for (int i = 0; waiters.get() > 0; i++) {
      if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
         return;
      }
      else if ((i & 0xff) == 0xff) {
         parkNanos(MICROSECONDS.toNanos(10));
      }
      else {
         yield();
      }
   }
   // 否則,進行資源本地化
   final List threadLocalList = threadList.get();
   threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}

上述代碼中的 weakThreadLocals 是用來判斷是否使用弱取用,通過下述方法初始化:

private boolean useWeakThreadLocals()
{
   try {
      // 人工指定是否使用弱取用,但是官方不推薦進行自主設置。
      if (System.getProperty("com.dareway.concurrent.useWeakReferences") != null) {
         return Boolean.getBoolean("com.dareway.concurrent.useWeakReferences");
      }
      // 預設通過判斷初始化的ClassLoader是否是系統的ClassLoader來確定
      return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
   }
   catch (SecurityException se) {
      return true;
   }
}

參考資料

【追光者系列】Hikari連接池配多大合適?

【追光者系列】HikariCP連接池監控指標實戰

【追光者系列】HikariCP預設配置

【追光者系列】Can you Read Aloud The Word Hikari Properly?

https://github.com/brettwooldridge/HikariCP
https://github.com/brettwooldridge/HikariCP/issues/1060
https://github.com/brettwooldridge/HikariCP/wiki/MBean-(JMX)-Monitoring-and-Management
https://segmentfault.com/a/1190000013062326
http://www.cnblogs.com/taisenki/p/7699667.html

666. 彩蛋




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

原始碼不易↓↓↓

點贊支持老艿艿↓↓

赞(0)

分享創造快樂