歡迎光臨
每天分享高質量文章

【追光者系列】HikariCP原始碼分析之leakDetectionThreshold及實戰解決Spark/Scala連線池洩漏

   摘自【工匠小豬豬的技術世界】 

  1. 這是一個系列,有興趣的朋友可以持續關註

  2. 如果你有HikariCP使用上的問題,可以給我留言,我們一起溝通討論

  3. 希望大家可以提供我一些案例,我也希望可以支援你們做一些調優


概念

此屬性控制在記錄訊息之前連線可能離開池的時間量,單位毫秒,預設為0,表明可能存在連線洩漏。 如果大於0且不是單元測試,則進一步判斷:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),會被重置為0。即如果要生效則必須>0,而且不能小於2秒,而且當maxLifetime > 0時不能大於maxLifetime(預設值1800000毫秒=30分鐘)。

leakDetectionThreshold  This property controls the amount of time that a connection can be out of the pool before a message is logged indicating a possible connection leak. A value of 0 means leak detection is disabled. Lowest acceptable value for enabling leak detection is 2000 (2 seconds). Default: 0

更多配置大綱詳見文章 【追光者系列】HikariCP預設配置

原始碼解析

我們首先來看一下leakDetectionThreshold用在了哪裡的綱要圖:

Write

還記得上一篇文章【追光者系列】HikariCP原始碼分析之從validationTimeout來講講Hikari 2.7.5版本的那些故事提到:我們可以看到在兩處看到validationTimeout的寫入,一處是PoolBase建構式,另一處是HouseKeeper執行緒。 leakDetectionThreshold的用法可以說是異曲同工,除了建構式之外,也用了HouseKeeper執行緒去處理。

HikariConfig

在com.zaxxer.hikari.HikariConfig中進行了leakDetectionThreshold初始化工作,

  1. @Override

  2.   public void setLeakDetectionThreshold(long leakDetectionThresholdMs) {

  3.      this.leakDetectionThreshold = leakDetectionThresholdMs;

  4.   }```

  5. validateNumerics方法中則是解釋了上文及官方檔案中該值validate的策略

if (leakDetectionThreshold > 0 && !unitTest) {         if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) {            LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);            leakDetectionThreshold = 0;         }      }```

該方法會被HikariConfig#validate所呼叫,而HikariConfig#validate會在HikariDataSource的specified configuration的建構式使用到

  1.  /**

  2.    * Construct a HikariDataSource with the specified configuration.  The

  3.    * {@link HikariConfig} is copied and the pool is started by invoking this

  4.    * constructor.

  5.    *

  6.    * The {@link HikariConfig} can be modified without affecting the HikariDataSource

  7.    * and used to initialize another HikariDataSource instance.

  8.    *

  9.    * @param configuration a HikariConfig instance

  10.    */

  11.   public HikariDataSource(HikariConfig configuration)

  12.   {

  13.      configuration.validate();

  14.      configuration.copyStateTo(this);

  15.      LOGGER.info("{} - Starting...", configuration.getPoolName());

  16.      pool = fastPathPool = new HikariPool(this);

  17.      LOGGER.info("{} - Start completed.", configuration.getPoolName());

  18.      this.seal();

  19.   }

也在每次getConnection的時候用到了,

  1. // ***********************************************************************

  2.   //                          DataSource methods

  3.   // ***********************************************************************

  4.   /** {@inheritDoc} */

  5.   @Override

  6.   public Connection getConnection() throws SQLException {

  7.      if (isClosed()) {

  8.         throw new SQLException("HikariDataSource " + this + " has been closed.");

  9.      }

  10.      if (fastPathPool != null) {

  11.         return fastPathPool.getConnection();

  12.      }

  13.      // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java

  14.      HikariPool result = pool;

  15.      if (result == null) {

  16.         synchronized (this) {

  17.            result = pool;

  18.            if (result == null) {

  19.               validate();

  20.               LOGGER.info("{} - Starting...", getPoolName());

  21.               try {

  22.                  pool = result = new HikariPool(this);

  23.                  this.seal();

  24.               }

  25.               catch (PoolInitializationException pie) {

  26.                  if (pie.getCause() instanceof SQLException) {

  27.                     throw (SQLException) pie.getCause();

  28.                  }

  29.                  else {

  30.                     throw pie;

  31.                  }

  32.               }

  33.               LOGGER.info("{} - Start completed.", getPoolName());

  34.            }

  35.         }

  36.      }

  37.      return result.getConnection();

  38.   }

這裡要特別提一下一個很牛逼的Double-checkedlocking的實現,大家可以看一下這篇文章 https://en.wikipedia.org/wiki/Double-checkedlocking#UsageinJava

  1. // Works with acquire/release semantics for volatile in Java 1.5 and later

  2. // Broken under Java 1.4 and earlier semantics for volatile

  3. class Foo {

  4.    private volatile Helper helper;

  5.    public Helper getHelper() {

  6.        Helper localRef = helper;

  7.        if (localRef == null) {

  8.            synchronized(this) {

  9.                localRef = helper;

  10.                if (localRef == null) {

  11.                    helper = localRef = new Helper();

  12.                }

  13.            }

  14.        }

  15.        return localRef;

  16.    }

  17.    // other functions and members...

  18. }

HouseKeeper

我們再來看一下com.zaxxer.hikari.pool.HikariPool這個程式碼,該執行緒嘗試在池中維護的最小空閑連線數,並不斷掃清的透過MBean調整的connectionTimeout和validationTimeout等值,leakDetectionThreshold這個值也是透過這個HouseKeeper的leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold())去管理的。

  1.  /**

  2.    * The house keeping task to retire and maintain minimum idle connections.

  3.    */

  4.   private final class HouseKeeper implements Runnable {

  5.      private volatile long previous = plusMillis(currentTime(), -HOUSEKEEPING_PERIOD_MS);

  6.      @Override

  7.      public void run()

  8.      {

  9.         try {

  10.            // refresh timeouts in case they changed via MBean

  11.            connectionTimeout = config.getConnectionTimeout();

  12.            validationTimeout = config.getValidationTimeout();

  13.            leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());

  14.            final long idleTimeout = config.getIdleTimeout();

  15.            final long now = currentTime();

  16.            // Detect retrograde time, allowing +128ms as per NTP spec.

  17.            if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {

  18.               LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",

  19.                           poolName, elapsedDisplayString(previous, now));

  20.               previous = now;

  21.               softEvictConnections();

  22.               fillPool();

  23.               return;

  24.            }

  25.            else if (now > plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) {

  26.               // No point evicting for forward clock motion, this merely accelerates connection retirement anyway

  27.               LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));

  28.            }

  29.            previous = now;

  30.            String afterPrefix = "Pool ";

  31.            if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {

  32.               logPoolState("Before cleanup ");

  33.               afterPrefix = "After cleanup  ";

  34.               final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);

  35.               int removed = 0;

  36.               for (PoolEntry entry : notInUse) {

  37.                  if (elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {

  38.                     closeConnection(entry, "(connection has passed idleTimeout)");

  39.                     if (++removed > config.getMinimumIdle()) {

  40.                        break;

  41.                     }

  42.                  }

  43.               }

  44.            }

  45.            logPoolState(afterPrefix);

  46.            fillPool(); // Try to maintain minimum connections

  47.         }

  48.         catch (Exception e) {

  49.            LOGGER.error("Unexpected exception in housekeeping task", e);

  50.         }

  51.      }

  52.   }

這裡補充說一下這個HouseKeeper,它是在com.zaxxer.hikari.pool.HikariPool的建構式中初始化的:this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();

  1. /**

  2.    * Create/initialize the Housekeeping service {@link ScheduledExecutorService}.  If the user specified an Executor

  3.    * to be used in the {@link HikariConfig}, then we use that.  If no Executor was specified (typical), then create

  4.    * an Executor and configure it.

  5.    *

  6.    * @return either the user specified {@link ScheduledExecutorService}, or the one we created

  7.    */

  8.   private ScheduledExecutorService initializeHouseKeepingExecutorService() {

  9.      if (config.getScheduledExecutor() == null) {

  10.         final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElse(new DefaultThreadFactory(poolName + " housekeeper", true));

  11.         final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());

  12.         executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

  13.         executor.setRemoveOnCancelPolicy(true);

  14.         return executor;

  15.      }

  16.      else {

  17.         return config.getScheduledExecutor();

  18.      }

  19.   }

這裡簡要說明一下,ScheduledThreadPoolExecutor是ThreadPoolExecutor類的子類,因為繼承了ThreadPoolExecutor類所有的特性。但是,Java推薦僅在開發定時任務程式時採用ScheduledThreadPoolExecutor類。 在呼叫shutdown()方法而仍有待處理的任務需要執行時,可以配置ScheduledThreadPoolExecutor的行為。預設的行為是不論執行器是否結束,待處理的任務仍將被執行。但是,透過呼叫ScheduledThreadPoolExecutor類的setExecuteExistingDelayedTasksAfterShutdownPolicy()方法則可以改變這個行為。傳遞false引數給這個方法,執行shutdown()方法之後,待處理的任務將不會被執行。 取消任務後,判斷是否需要從阻塞佇列中移除任務。其中removeOnCancel引數透過setRemoveOnCancelPolicy()設定。之所以要在取消任務後移除阻塞佇列中任務,是為了防止佇列中積壓大量已被取消的任務。 從這兩個引數配置大家可以瞭解到作者的對於HouseKeeper的配置初衷。

小結

Hikari透過建構式和HouseKeeper對於一些配置引數進行初始化及動態賦值,動態賦值依賴於HikariConfigMXbean以及使用任務排程執行緒池ScheduledThreadPoolExecutor來不斷掃清配置的。

我們僅僅以com.zaxxer.hikari.HikariConfig來做下小結,允許在執行時進行動態修改的主要有:

  1. // Properties changeable at runtime through the HikariConfigMXBean

  2.   private volatile long connectionTimeout;

  3.   private volatile long validationTimeout;

  4.   private volatile long idleTimeout;

  5.   private volatile long leakDetectionThreshold;

  6.   private volatile long maxLifetime;

  7.   private volatile int maxPoolSize;

  8.   private volatile int minIdle;

  9.   private volatile String username;

  10.   private volatile String password;

不允許在執行時進行改變的主要有

  1.   // Properties NOT changeable at runtime

  2.   private long initializationFailTimeout;

  3.   private String catalog;

  4.   private String connectionInitSql;

  5.   private String connectionTestQuery;

  6.   private String dataSourceClassName;

  7.   private String dataSourceJndiName;

  8.   private String driverClassName;

  9.   private String jdbcUrl;

  10.   private String poolName;

  11.   private String schema;

  12.   private String transactionIsolationName;

  13.   private boolean isAutoCommit;

  14.   private boolean isReadOnly;

  15.   private boolean isIsolateInternalQueries;

  16.   private boolean isRegisterMbeans;

  17.   private boolean isAllowPoolSuspension;

  18.   private DataSource dataSource;

  19.   private Properties dataSourceProperties;

  20.   private ThreadFactory threadFactory;

  21.   private ScheduledExecutorService scheduledExecutor;

  22.   private MetricsTrackerFactory metricsTrackerFactory;

  23.   private Object metricRegistry;

  24.   private Object healthCheckRegistry;

  25.   private Properties healthCheckProperties;

Read

getConnection

在com.zaxxer.hikari.pool.HikariPool的核心方法getConnection傳回的時候呼叫了poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now) 註意,建立代理連線的時候關聯了ProxyLeakTask。 連線洩漏檢測的原理就是:連線有借有還,hikari是每借用一個connection則會建立一個延時的定時任務,在歸還或者出異常的或者使用者手動呼叫evictConnection的時候cancel掉這個task

  1. /**

  2.    * Get a connection from the pool, or timeout after the specified number of milliseconds.

  3.    *

  4.    * @param hardTimeout the maximum time to wait for a connection from the pool

  5.    * @return a java.sql.Connection instance

  6.    * @throws SQLException thrown if a timeout occurs trying to obtain a connection

  7.    */

  8.   public Connection getConnection(final long hardTimeout) throws SQLException {

  9.      suspendResumeLock.acquire();

  10.      final long startTime = currentTime();

  11.      try {

  12.         long timeout = hardTimeout;

  13.         do {

  14.            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);

  15.            if (poolEntry == null) {

  16.               break; // We timed out... break and throw exception

  17.            }

  18.            final long now = currentTime();

  19.            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {

  20.               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);

  21.               timeout = hardTimeout - elapsedMillis(startTime);

  22.            }

  23.            else {

  24.               metricsTracker.recordBorrowStats(poolEntry, startTime);

  25.               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);

  26.            }

  27.         } while (timeout > 0L);

  28.         metricsTracker.recordBorrowTimeoutStats(startTime);

  29.         throw createTimeoutException(startTime);

  30.      }

  31.      catch (InterruptedException e) {

  32.         Thread.currentThread().interrupt();

  33.         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);

  34.      }

  35.      finally {

  36.         suspendResumeLock.release();

  37.      }

  38.   }

leakTaskFactory、ProxyLeakTaskFactory、ProxyLeakTask

在HikariPool建構式裡,初始化了leakTaskFactory,以及houseKeepingExecutorService。

  1. this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();

  2. this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);

  3. this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS);

com.zaxxer.hikari.pool.ProxyLeakTaskFactory是作者慣用的設計,我們看一下原始碼:

  1. /**

  2. * A factory for {@link ProxyLeakTask} Runnables that are scheduled in the future to report leaks.

  3. *

  4. * @author Brett Wooldridge

  5. * @author Andreas Brenk

  6. */

  7. class ProxyLeakTaskFactory {

  8.   private ScheduledExecutorService executorService;

  9.   private long leakDetectionThreshold;

  10.   ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService)

  11.   {

  12.      this.executorService = executorService;

  13.      this.leakDetectionThreshold = leakDetectionThreshold;

  14.   }

  15.   ProxyLeakTask schedule(final PoolEntry poolEntry) {

  16.      return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry);

  17.   }

  18.   void updateLeakDetectionThreshold(final long leakDetectionThreshold) {

  19.      this.leakDetectionThreshold = leakDetectionThreshold;

  20.   }

  21.   private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {

  22.      ProxyLeakTask task = new ProxyLeakTask(poolEntry);

  23.      task.schedule(executorService, leakDetectionThreshold);

  24.      return task;

  25.   }

  26. }

如果leakDetectionThreshold=0,即禁用連線洩露檢測,schedule傳回的是ProxyLeakTask.NO_LEAK,否則則新建一個ProxyLeakTask,在leakDetectionThreshold時間後觸發

再看一下com.zaxxer.hikari.pool.ProxyLeakTask的原始碼

  1. /**

  2. * A Runnable that is scheduled in the future to report leaks.  The ScheduledFuture is

  3. * cancelled if the connection is closed before the leak time expires.

  4. *

  5. * @author Brett Wooldridge

  6. */

  7. class ProxyLeakTask implements Runnable {

  8.   private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class);

  9.   static final ProxyLeakTask NO_LEAK;

  10.   private ScheduledFuture> scheduledFuture;

  11.   private String connectionName;

  12.   private Exception exception;

  13.   private String threadName;

  14.   private boolean isLeaked;

  15.   static

  16.   {

  17.      NO_LEAK = new ProxyLeakTask() {

  18.         @Override

  19.         void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}

  20.         @Override

  21.         public void run() {}

  22.         @Override

  23.         public void cancel() {}

  24.      };

  25.   }

  26.   ProxyLeakTask(final PoolEntry poolEntry)

  27.   {

  28.      this.exception = new Exception("Apparent connection leak detected");

  29.      this.threadName = Thread.currentThread().getName();

  30.      this.connectionName = poolEntry.connection.toString();

  31.   }

  32.   private ProxyLeakTask() {

  33.   }

  34.   void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {

  35.      scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS);

  36.   }

  37.   /** {@inheritDoc} */

  38.   @Override

  39.   public void run() {

  40.      isLeaked = true;

  41.      final StackTraceElement[] stackTrace = exception.getStackTrace();

  42.      final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];

  43.      System.arraycopy(stackTrace, 5, trace, 0, trace.length);

  44.      exception.setStackTrace(trace);

  45.      LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);

  46.   }

  47.   void cancel() {

  48.      scheduledFuture.cancel(false);

  49.      if (isLeaked) {

  50.         LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName);

  51.      }

  52.   }

  53. }

NO_LEAK類裡頭的方法都是空操作 一旦該task被觸發,則丟擲Exception("Apparent connection leak detected")

我們想起了什麼,是不是想起了【追光者系列】HikariCP原始碼分析之allowPoolSuspension那篇文章裡有著一摸一樣的設計?

  1. this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

isAllowPoolSuspension預設值是false的,建構式直接會建立SuspendResumeLock.FAUX_LOCK;只有isAllowPoolSuspension為true時,才會真正建立SuspendResumeLock。

com.zaxxer.hikari.util.SuspendResumeLock內部實現了一虛一實兩個java.util.concurrent.Semaphore

  1. /**

  2. * This class implements a lock that can be used to suspend and resume the pool.  It

  3. * also provides a faux implementation that is used when the feature is disabled that

  4. * hopefully gets fully "optimized away" by the JIT.

  5. *

  6. * @author Brett Wooldridge

  7. */

  8. public class SuspendResumeLock {

  9.   public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {

  10.      @Override

  11.      public void acquire() {}

  12.      @Override

  13.      public void release() {}

  14.      @Override

  15.      public void suspend() {}

  16.      @Override

  17.      public void resume() {}

  18.   };

  19.   private static final int MAX_PERMITS = 10000;

  20.   private final Semaphore acquisitionSemaphore;

  21.   /**

  22.    * Default constructor

  23.    */

  24.   public SuspendResumeLock() {

  25.      this(true);

  26.   }

  27.   private SuspendResumeLock(final boolean createSemaphore) {

  28.      acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);

  29.   }

  30.   public void acquire() {

  31.      acquisitionSemaphore.acquireUninterruptibly();

  32.   }

  33.   public void release() {

  34.      acquisitionSemaphore.release();

  35.   }

  36.   public void suspend() {

  37.      acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);

  38.   }

  39.   public void resume() {

  40.      acquisitionSemaphore.release(MAX_PERMITS);

  41.   }

  42. }

由於Hikari的isAllowPoolSuspension預設值是false的,FAUXLOCK只是一個空方法,acquisitionSemaphore物件也是空的;如果isAllowPoolSuspension值調整為true,當收到MBean的suspend呼叫時將會一次性acquisitionSemaphore.acquireUninterruptibly從此訊號量獲取給定數目MAXPERMITS 10000的許可,在提供這些許可前一直將執行緒阻塞。之後HikariPool的getConnection方法獲取不到連線,阻塞在suspendResumeLock.acquire(),除非resume方法釋放給定數目MAX_PERMITS 10000的許可,將其傳回到訊號量

close

連線有借有還,連線檢測的task也是會關閉的。 我們看一下com.zaxxer.hikari.pool.ProxyConnection原始碼,

  1. // **********************************************************************

  2.   //              "Overridden" java.sql.Connection Methods

  3.   // **********************************************************************

  4.   /** {@inheritDoc} */

  5.   @Override

  6.   public final void close() throws SQLException {

  7.      // Closing statements can cause connection eviction, so this must run before the conditional below

  8.      closeStatements();

  9.      if (delegate != ClosedConnection.CLOSED_CONNECTION) {

  10.         leakTask.cancel();

  11.         try {

  12.            if (isCommitStateDirty && !isAutoCommit) {

  13.               delegate.rollback();

  14.               lastAccess = currentTime();

  15.               LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);

  16.            }

  17.            if (dirtyBits != 0) {

  18.               poolEntry.resetConnectionState(this, dirtyBits);

  19.               lastAccess = currentTime();

  20.            }

  21.            delegate.clearWarnings();

  22.         }

  23.         catch (SQLException e) {

  24.            // when connections are aborted, exceptions are often thrown that should not reach the application

  25.            if (!poolEntry.isMarkedEvicted()) {

  26.               throw checkException(e);

  27.            }

  28.         }

  29.         finally {

  30.            delegate = ClosedConnection.CLOSED_CONNECTION;

  31.            poolEntry.recycle(lastAccess);

  32.         }

  33.      }

  34.   }

在connection的close的時候,delegate != ClosedConnection.CLOSED_CONNECTION時會呼叫leakTask.cancel();取消檢測連線洩露的task。

在closeStatements中也會關閉:

  1. @SuppressWarnings("EmptyTryBlock")

  2.   private synchronized void closeStatements() {

  3.      final int size = openStatements.size();

  4.      if (size > 0) {

  5.         for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {

  6.            try (Statement ignored = openStatements.get(i)) {

  7.               // automatic resource cleanup

  8.            }

  9.            catch (SQLException e) {

  10.               LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",

  11.                           poolEntry.getPoolName(), delegate);

  12.               leakTask.cancel();

  13.               poolEntry.evict("(exception closing Statements during Connection.close())");

  14.               delegate = ClosedConnection.CLOSED_CONNECTION;

  15.            }

  16.         }

  17.         openStatements.clear();

  18.      }

  19.   }

在checkException中也會關閉

  1. final SQLException checkException(SQLException sqle) {

  2.      SQLException nse = sqle;

  3.      for (int depth = 0; delegate != ClosedConnection.CLOSED_CONNECTION && nse != null && depth < 10; depth++) {

  4.         final String sqlState = nse.getSQLState();

  5.         if (sqlState != null && sqlState.startsWith("08") || ERROR_STATES.contains(sqlState) || ERROR_CODES.contains(nse.getErrorCode())) {

  6.            // broken connection

  7.            LOGGER.warn("{} - Connection {} marked as broken because of SQLSTATE({}), ErrorCode({})",

  8.                        poolEntry.getPoolName(), delegate, sqlState, nse.getErrorCode(), nse);

  9.            leakTask.cancel();

  10.            poolEntry.evict("(connection is broken)");

  11.            delegate = ClosedConnection.CLOSED_CONNECTION;

  12.         }

  13.         else {

  14.            nse = nse.getNextException();

  15.         }

  16.      }

  17.      return sqle;

  18.   }

在com.zaxxer.hikari.pool.HikariPool的evictConnection中,也會關閉任務

  1.  /**

  2.    * Evict a Connection from the pool.

  3.    *

  4.    * @param connection the Connection to evict (actually a {@link ProxyConnection})

  5.    */

  6.   public void evictConnection(Connection connection) {

  7.      ProxyConnection proxyConnection = (ProxyConnection) connection;

  8.      proxyConnection.cancelLeakTask();

  9.      try {

  10.         softEvictConnection(proxyConnection.getPoolEntry(), "(connection evicted by user)", !connection.isClosed() /* owner */);

  11.      }

  12.      catch (SQLException e) {

  13.         // unreachable in HikariCP, but we're still forced to catch it

  14.      }

  15.   }

小結關閉任務如下圖所示:

測試模擬

我們可以根據本文對於leakDetectionThreshold的分析用測試包裡的com.zaxxer.hikari.pool.MiscTest程式碼進行適當引數調整模擬連線洩漏情況,測試程式碼如下:

  1. /**

  2. * @author Brett Wooldridge

  3. */

  4. public class MiscTest {

  5.   @Test

  6.   public void testLogWriter() throws SQLException {

  7.      HikariConfig config = newHikariConfig();

  8.      config.setMinimumIdle(0);

  9.      config.setMaximumPoolSize(4);

  10.      config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");

  11.      setConfigUnitTest(true);

  12.      try (HikariDataSource ds = new HikariDataSource(config)) {

  13.         PrintWriter writer = new PrintWriter(System.out);

  14.         ds.setLogWriter(writer);

  15.         assertSame(writer, ds.getLogWriter());

  16.         assertEquals("testLogWriter", config.getPoolName());

  17.      }

  18.      finally

  19.      {

  20.         setConfigUnitTest(false);

  21.      }

  22.   }

  23.   @Test

  24.   public void testInvalidIsolation() {

  25.      try {

  26.         getTransactionIsolation("INVALID");

  27.         fail();

  28.      }

  29.      catch (Exception e) {

  30.         assertTrue(e instanceof IllegalArgumentException);

  31.      }

  32.   }

  33.   @Test

  34.   public void testCreateInstance() {

  35.      try {

  36.         createInstance("invalid", null);

  37.         fail();

  38.      }

  39.      catch (RuntimeException e) {

  40.         assertTrue(e.getCause() instanceof ClassNotFoundException);

  41.      }

  42.   }

  43.   @Test

  44.   public void testLeakDetection() throws Exception {

  45.      ByteArrayOutputStream baos = new ByteArrayOutputStream();

  46.      try (PrintStream ps = new PrintStream(baos, true)) {

  47.         setSlf4jTargetStream(Class.forName("com.zaxxer.hikari.pool.ProxyLeakTask"), ps);

  48.         setConfigUnitTest(true);

  49.         HikariConfig config = newHikariConfig();

  50.         config.setMinimumIdle(0);

  51.         config.setMaximumPoolSize(4);

  52.         config.setThreadFactory(Executors.defaultThreadFactory());

  53.         config.setMetricRegistry(null);

  54.         config.setLeakDetectionThreshold(TimeUnit.SECONDS.toMillis(4));

  55.         config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");

  56.         try (HikariDataSource ds = new HikariDataSource(config)) {

  57.            setSlf4jLogLevel(HikariPool.class, Level.DEBUG);

  58.            getPool(ds).logPoolState();

  59.            try (Connection connection = ds.getConnection()) {

  60.               quietlySleep(SECONDS.toMillis(4));

  61.               connection.close();

  62.               quietlySleep(SECONDS.toMillis(1));

  63.               ps.close();

  64.               String s = new String(baos.toByteArray());

  65.               assertNotNull("Exception string was null", s);

  66.               assertTrue("Expected exception to contain 'Connection leak detection' but contains *" + s + "*", s.contains("Connection leak detection"));

  67.            }

  68.         }

  69.         finally

  70.         {

  71.            setConfigUnitTest(false);

  72.            setSlf4jLogLevel(HikariPool.class, Level.INFO);

  73.         }

  74.      }

  75.   }

  76. }

當程式碼執行到了quietlySleep(SECONDS.toMillis(4));時直接按照預期拋異常Apparent connection leak detected。

緊接著在close的過程中執行到了delegate != ClosedConnection.CLOSED_CONNECTION來進行leakTask.cancel()

完整的測試輸出模擬過程如下所示:

Spark/Scala連線池洩漏問題排查

金融中心大資料決策資料組的同學找到反饋了一個問題:

我們在同一個jvm 需要連線多個資料庫時,發現總體上 從連線池borrow 的 connection 多於 歸還的,一段時間後 連線池就會報出 Caused by: java.sql.SQLTransientConnectionException: HikariPool-0 - Connection is not available, request timed out after 30000ms的異常。

使用者使用的spark的場景有點特殊,單機上開的連結很小,但是有很多機器都會去連。使用者在一個jvm中就只會併發1個連結。

  1. maximumPoolSize: 5

  2. minimumIdle: 2

程式也會出現block的情況,發現是執行mysql時出現的, mysql show processlist;發現大多停留在query end的情況,程式 thread dump 行程 持有monitor的執行緒。

DBA介入之後發現存在slow sql。

當然,這個問題出了是寫頻繁導致的,一次寫入的量有點大,每一個sql都巨大走的batch,寫入的 records 數在每秒 30-50條,一個record 有70多個欄位。一個解決方式是把 binlog 移到 ssd 盤;還有一個方式是innodbflushlogattrx_commit把這個引數改成0了,估計可能會提高20%~30%。

修複瞭如上一些問題之後,又發現使用者反饋的問題,加了leakDetectionThreshold,得出的結論是存在連線洩漏(從池中借用後連線沒有關閉)。

針對這個問題,我們懷疑的連線池洩漏的點要麼在hikari中,要麼在spark/scala中。採用排除法使用了druid,依然存在這個問題;於是我們就去翻spark這塊的程式碼,仔細分析之後定位到了問題:

因為scala map懶載入,一開始mapPartitions都落在一個stage中,我們調整程式碼toList之後result.iterator就分在獨立的stage中,連線池洩漏問題就不再存在。

根本原因可以參見《Spark : How to use mapPartition and create/close connection per partition 》: https://stackoverflow.com/questions/36545579/spark-how-to-use-mappartition-and-create-close-connection-per-partition/36545821#36545821

一開始以為這是一個連線池問題,或者是spark問題,但是實際上透過leakDetectionThreshold的定位,我們得知實際上這是一個scala問題 :)

參考資料

  • https://segmentfault.com/a/1190000013092894

END

贊(0)

分享創造快樂