歡迎光臨
每天分享高質量文章

HikariCP 原始碼分析之 leakDetectionThreshold 及實戰解決 Spark/Scala 連接池泄漏

點擊上方“芋道原始碼”,選擇“置頂公眾號”

技術文章第一時間送達!

原始碼精品專欄

 

摘要: 原創出處 https://mp.weixin.qq.com/s/_ghOnuwbLHOkqGKgzWdLVw 「渣渣王子」歡迎轉載,保留摘要,謝謝!

  • 1. 概念

  • 2. 原始碼分析

    • 2.1.1 HikariConfig

    • 2.1.2 HouseKeeper

    • 2.1.3 小結

    • 2.2.1 getConnection

    • 2.2.2 leakTaskFactory、ProxyLeakTaskFactory、ProxyLeakTask

    • 2.2.3 close

  • 3. 測試模擬

  • 4. Spark/Scala連接池泄漏問題排查

  • 5. 參考資料

  • 6. 系列文章

音樂資源加載中…

概念

此屬性控制在記錄訊息之前連接可能離開池的時間量,單位毫秒,預設為0,表明可能存在連接泄漏。
如果大於0且不是單元測試,則進一步判斷:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),會被重置為0。即如果要生效則必須>0,而且不能小於2秒,而且當maxLifetime > 0時不能大於maxLifetime(預設值1800000毫秒=30分鐘)。

leakDetectionThreshold 
This property controls the amount of time that a connection can be out of the pool before a message is logged indicating a possible connection leak. A value of 0 means leak detection is disabled. Lowest acceptable value for enabling leak detection is 2000 (2 seconds). Default: 0

更多配置大綱詳見文章 【追光者系列】HikariCP預設配置

原始碼解析

我們首先來看一下leakDetectionThreshold用在了哪裡的綱要圖:

Write

還記得上一篇文章【追光者系列】HikariCP原始碼分析之從validationTimeout來講講Hikari 2.7.5版本的那些故事提到:我們可以看到在兩處看到validationTimeout的寫入,一處是PoolBase建構式,另一處是HouseKeeper執行緒。
leakDetectionThreshold的用法可以說是異曲同工,除了建構式之外,也用了HouseKeeper執行緒去處理。

HikariConfig

在com.zaxxer.hikari.HikariConfig中進行了leakDetectionThreshold初始化工作,

@Override
  public void setLeakDetectionThreshold(long leakDetectionThresholdMs)
 
{
     this.leakDetectionThreshold = leakDetectionThresholdMs;
  }

validateNumerics方法中則是解釋了上文及官方文件中該值validate的策略

if (leakDetectionThreshold > 0 && !unitTest) {
        if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) {
           LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);
           leakDetectionThreshold = 0;
        }
     }

該方法會被HikariConfig#validate所呼叫,而HikariConfig#validate會在HikariDataSource的specified configuration的建構式使用到

   /**
   * Construct a HikariDataSource with the specified configuration.  The
   * {@link HikariConfig} is copied and the pool is started by invoking this
   * constructor.
   *
   * The {@link HikariConfig} can be modified without affecting the HikariDataSource
   * and used to initialize another HikariDataSource instance.
   *
   * @param configuration a HikariConfig instance
   */

  public HikariDataSource(HikariConfig configuration)
  {
     configuration.validate();
     configuration.copyStateTo(this);
     LOGGER.info("{} - Starting...", configuration.getPoolName());
     pool = fastPathPool = new HikariPool(this);
     LOGGER.info("{} - Start completed.", configuration.getPoolName());
     this.seal();
  }

也在每次getConnection的時候用到了,

 // ***********************************************************************
  //                          DataSource methods
  // ***********************************************************************
  /** {@inheritDoc} */
  @Override
  public Connection getConnection() throws SQLException
 
{
     if (isClosed()) {
        throw new SQLException("HikariDataSource " + this + " has been closed.");
     }
     if (fastPathPool != null) {
        return fastPathPool.getConnection();
     }
     // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
     HikariPool result = pool;
     if (result == null) {
        synchronized (this) {
           result = pool;
           if (result == null) {
              validate();
              LOGGER.info("{} - Starting...", getPoolName());
              try {
                 pool = result = new HikariPool(this);
                 this.seal();
              }
              catch (PoolInitializationException pie) {
                 if (pie.getCause() instanceof SQLException) {
                    throw (SQLException) pie.getCause();
                 }
                 else {
                    throw pie;
                 }
              }
              LOGGER.info("{} - Start completed.", getPoolName());
           }
        }
     }
     return result.getConnection();
  }

這裡要特別提一下一個很牛逼的Double-checked_locking的實現,大家可以看一下這篇文章 https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java

// Works with acquire/release semantics for volatile in Java 1.5 and later
// Broken under Java 1.4 and earlier semantics for volatile
class Foo {
   private volatile Helper helper;
   public Helper getHelper() {
       Helper localRef = helper;
       if (localRef == null) {
           synchronized(this) {
               localRef = helper;
               if (localRef == null) {
                   helper = localRef = new Helper();
               }
           }
       }
       return localRef;
   }
   // other functions and members...
}

HouseKeeper

我們再來看一下com.zaxxer.hikari.pool.HikariPool這個代碼,該執行緒嘗試在池中維護的最小空閑連接數,並不斷掃清的通過MBean調整的connectionTimeout和validationTimeout等值,leakDetectionThreshold這個值也是通過這個HouseKeeper的leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold())去管理的。

/**
   * The house keeping task to retire and maintain minimum idle connections.
   */

  private final class HouseKeeper implements Runnable
 
{
     private volatile long previous = plusMillis(currentTime(), -HOUSEKEEPING_PERIOD_MS);
     @Override
     public void run()
     
{
        try {
           // refresh timeouts in case they changed via MBean
           connectionTimeout = config.getConnectionTimeout();
           validationTimeout = config.getValidationTimeout();
           leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
           final long idleTimeout = config.getIdleTimeout();
           final long now = currentTime();
           // Detect retrograde time, allowing +128ms as per NTP spec.
           if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {
              LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
                          poolName, elapsedDisplayString(previous, now));
              previous = now;
              softEvictConnections();
              fillPool();
              return;
           }
           else if (now > plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) {
              // No point evicting for forward clock motion, this merely accelerates connection retirement anyway
              LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
           }
           previous = now;
           String afterPrefix = "Pool ";
           if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
              logPoolState("Before cleanup ");
              afterPrefix = "After cleanup  ";
              final List notInUse = connectionBag.values(STATE_NOT_IN_USE);
              int removed = 0;
              for (PoolEntry entry : notInUse) {
                 if (elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
                    closeConnection(entry, "(connection has passed idleTimeout)");
                    if (++removed > config.getMinimumIdle()) {
                       break;
                    }
                 }
              }
           }
           logPoolState(afterPrefix);
           fillPool(); // Try to maintain minimum connections
        }
        catch (Exception e) {
           LOGGER.error("Unexpected exception in housekeeping task", e);
        }
     }
  }

這裡補充說一下這個HouseKeeper,它是在com.zaxxer.hikari.pool.HikariPool的建構式中初始化的:this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();

 /**
   * Create/initialize the Housekeeping service {@link ScheduledExecutorService}.  If the user specified an Executor
   * to be used in the {@link HikariConfig}, then we use that.  If no Executor was specified (typical), then create
   * an Executor and configure it.
   *
   * @return either the user specified {@link ScheduledExecutorService}, or the one we created
   */

  private ScheduledExecutorService initializeHouseKeepingExecutorService()
 
{
     if (config.getScheduledExecutor() == null) {
        final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElse(new DefaultThreadFactory(poolName + " housekeeper", true));
        final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        executor.setRemoveOnCancelPolicy(true);
        return executor;
     }
     else {
        return config.getScheduledExecutor();
     }
  }

這裡簡要說明一下,ScheduledThreadPoolExecutor是ThreadPoolExecutor類的子類,因為繼承了ThreadPoolExecutor類所有的特性。但是,Java推薦僅在開發定時任務程式時採用ScheduledThreadPoolExecutor類。
在呼叫shutdown()方法而仍有待處理的任務需要執行時,可以配置ScheduledThreadPoolExecutor的行為。預設的行為是不論執行器是否結束,待處理的任務仍將被執行。但是,通過呼叫ScheduledThreadPoolExecutor類的setExecuteExistingDelayedTasksAfterShutdownPolicy()方法則可以改變這個行為。傳遞false引數給這個方法,執行shutdown()方法之後,待處理的任務將不會被執行。
取消任務後,判斷是否需要從阻塞佇列中移除任務。其中removeOnCancel引數通過setRemoveOnCancelPolicy()設置。之所以要在取消任務後移除阻塞佇列中任務,是為了防止佇列中積壓大量已被取消的任務
從這兩個引數配置大家可以瞭解到作者的對於HouseKeeper的配置初衷。

小結

Hikari通過建構式和HouseKeeper對於一些配置引數進行初始化及動態賦值,動態賦值依賴於HikariConfigMXbean以及使用任務調度執行緒池ScheduledThreadPoolExecutor來不斷掃清配置的。

我們僅僅以com.zaxxer.hikari.HikariConfig來做下小結,允許在運行時進行動態修改的主要有:

   // Properties changeable at runtime through the HikariConfigMXBean
  //
  private volatile long connectionTimeout;
  private volatile long validationTimeout;
  private volatile long idleTimeout;
  private volatile long leakDetectionThreshold;
  private volatile long maxLifetime;
  private volatile int maxPoolSize;
  private volatile int minIdle;
  private volatile String username;
  private volatile String password;

不允許在運行時進行改變的主要有

  // Properties NOT changeable at runtime
  //
  private long initializationFailTimeout;
  private String catalog;
  private String connectionInitSql;
  private String connectionTestQuery;
  private String dataSourceClassName;
  private String dataSourceJndiName;
  private String driverClassName;
  private String jdbcUrl;
  private String poolName;
  private String schema;
  private String transactionIsolationName;
  private boolean isAutoCommit;
  private boolean isReadOnly;
  private boolean isIsolateInternalQueries;
  private boolean isRegisterMbeans;
  private boolean isAllowPoolSuspension;
  private DataSource dataSource;
  private Properties dataSourceProperties;
  private ThreadFactory threadFactory;
  private ScheduledExecutorService scheduledExecutor;
  private MetricsTrackerFactory metricsTrackerFactory;
  private Object metricRegistry;
  private Object healthCheckRegistry;
  private Properties healthCheckProperties;

Read

getConnection

在com.zaxxer.hikari.pool.HikariPool的核心方法getConnection傳回的時候呼叫了poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now)
註意,創建代理連接的時候關聯了ProxyLeakTask。
連接泄漏檢測的原理就是:連接有借有還,hikari是每借用一個connection則會創建一個延時的定時任務,在歸還或者出異常的或者用戶手動呼叫evictConnection的時候cancel掉這個task

   /**
   * Get a connection from the pool, or timeout after the specified number of milliseconds.
   *
   * @param hardTimeout the maximum time to wait for a connection from the pool
   * @return a java.sql.Connection instance
   * @throws SQLException thrown if a timeout occurs trying to obtain a connection
   */

  public Connection getConnection(final long hardTimeout) throws SQLException
 
{
     suspendResumeLock.acquire();
     final long startTime = currentTime();
     try {
        long timeout = hardTimeout;
        do {
           PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
           if (poolEntry == null) {
              break; // We timed out... break and throw exception
           }
           final long now = currentTime();
           if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {
              closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
              timeout = hardTimeout - elapsedMillis(startTime);
           }
           else {
              metricsTracker.recordBorrowStats(poolEntry, startTime);
              return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
           }
        } while (timeout > 0L);
        metricsTracker.recordBorrowTimeoutStats(startTime);
        throw createTimeoutException(startTime);
     }
     catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
     }
     finally {
        suspendResumeLock.release();
     }
  }

leakTaskFactory、ProxyLeakTaskFactory、ProxyLeakTask

在HikariPool建構式里,初始化了leakTaskFactory,以及houseKeepingExecutorService。

this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS);

com.zaxxer.hikari.pool.ProxyLeakTaskFactory是作者慣用的設計,我們看一下原始碼:

/**
* A factory for {@link ProxyLeakTask} Runnables that are scheduled in the future to report leaks.
*
* @author Brett Wooldridge
* @author Andreas Brenk
*/

class ProxyLeakTaskFactory
{
  private ScheduledExecutorService executorService;
  private long leakDetectionThreshold;
  ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService)
  {
     this.executorService = executorService;
     this.leakDetectionThreshold = leakDetectionThreshold;
  }
  ProxyLeakTask schedule(final PoolEntry poolEntry)
 
{
     return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry);
  }
  void updateLeakDetectionThreshold(final long leakDetectionThreshold)
 
{
     this.leakDetectionThreshold = leakDetectionThreshold;
  }
  private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {
     ProxyLeakTask task = new ProxyLeakTask(poolEntry);
     task.schedule(executorService, leakDetectionThreshold);
     return task;
  }
}

如果leakDetectionThreshold=0,即禁用連接泄露檢測,schedule傳回的是ProxyLeakTask.NO_LEAK,否則則新建一個ProxyLeakTask,在leakDetectionThreshold時間後觸發

再看一下com.zaxxer.hikari.pool.ProxyLeakTask的原始碼

/**
* A Runnable that is scheduled in the future to report leaks.  The ScheduledFuture is
* cancelled if the connection is closed before the leak time expires.
*
* @author Brett Wooldridge
*/

class ProxyLeakTask implements Runnable
{
  private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class);
  static final ProxyLeakTask NO_LEAK;
  private ScheduledFuture> scheduledFuture;
  private String connectionName;
  private Exception exception;
  private String threadName;
  private boolean isLeaked;
  static
  {
     NO_LEAK = new ProxyLeakTask() {
        @Override
        void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}
        @Override
        public void run() {}
        @Override
        public void cancel() {}
     };
  }
  ProxyLeakTask(final PoolEntry poolEntry)
  {
     this.exception = new Exception("Apparent connection leak detected");
     this.threadName = Thread.currentThread().getName();
     this.connectionName = poolEntry.connection.toString();
  }
  private ProxyLeakTask()
 
{
  }
  void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold)
 
{
     scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS);
  }
  /** {@inheritDoc} */
  @Override
  public void run()
 
{
     isLeaked = true;
     final StackTraceElement[] stackTrace = exception.getStackTrace();
     final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
     System.arraycopy(stackTrace, 5, trace, 0, trace.length);
     exception.setStackTrace(trace);
     LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
  }
  void cancel()
 
{
     scheduledFuture.cancel(false);
     if (isLeaked) {
        LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName);
     }
  }
}

NO_LEAK類裡頭的方法都是空操作
一旦該task被觸發,則丟擲Exception(“Apparent connection leak detected”)

我們想起了什麼,是不是想起了【追光者系列】HikariCP原始碼分析之allowPoolSuspension那篇文章里有著一摸一樣的設計?

this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

isAllowPoolSuspension預設值是false的,建構式直接會創建SuspendResumeLock.FAUX_LOCK;只有isAllowPoolSuspension為true時,才會真正創建SuspendResumeLock。

com.zaxxer.hikari.util.SuspendResumeLock內部實現了一虛一實兩個java.util.concurrent.Semaphore

/**
* This class implements a lock that can be used to suspend and resume the pool.  It
* also provides a faux implementation that is used when the feature is disabled that
* hopefully gets fully "optimized away" by the JIT.
*
* @author Brett Wooldridge
*/

public class SuspendResumeLock
{
  public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {
     @Override
     public void acquire() {}
     @Override
     public void release() {}
     @Override
     public void suspend() {}
     @Override
     public void resume() {}
  };
  private static final int MAX_PERMITS = 10000;
  private final Semaphore acquisitionSemaphore;
  /**
   * Default constructor
   */

  public SuspendResumeLock()
 
{
     this(true);
  }
  private SuspendResumeLock(final boolean createSemaphore)
 
{
     acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);
  }
  public void acquire()
 
{
     acquisitionSemaphore.acquireUninterruptibly();
  }
  public void release()
 
{
     acquisitionSemaphore.release();
  }
  public void suspend()
 
{
     acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);
  }
  public void resume()
 
{
     acquisitionSemaphore.release(MAX_PERMITS);
  }
}

由於Hikari的isAllowPoolSuspension預設值是false的,FAUX_LOCK只是一個空方法,acquisitionSemaphore物件也是空的;如果isAllowPoolSuspension值調整為true,當收到MBean的suspend呼叫時將會一次性acquisitionSemaphore.acquireUninterruptibly從此信號量獲取給定數目MAX_PERMITS 10000的許可,在提供這些許可前一直將執行緒阻塞。之後HikariPool的getConnection方法獲取不到連接,阻塞在suspendResumeLock.acquire(),除非resume方法釋放給定數目MAX_PERMITS 10000的許可,將其傳回到信號量

close

連接有借有還,連接檢測的task也是會關閉的。
我們看一下com.zaxxer.hikari.pool.ProxyConnection原始碼,

 // **********************************************************************
  //              "Overridden" java.sql.Connection Methods
  // **********************************************************************
  /** {@inheritDoc} */
  @Override
  public final void close() throws SQLException
 
{
     // Closing statements can cause connection eviction, so this must run before the conditional below
     closeStatements();
     if (delegate != ClosedConnection.CLOSED_CONNECTION) {
        leakTask.cancel();
        try {
           if (isCommitStateDirty && !isAutoCommit) {
              delegate.rollback();
              lastAccess = currentTime();
              LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
           }
           if (dirtyBits != 0) {
              poolEntry.resetConnectionState(this, dirtyBits);
              lastAccess = currentTime();
           }
           delegate.clearWarnings();
        }
        catch (SQLException e) {
           // when connections are aborted, exceptions are often thrown that should not reach the application
           if (!poolEntry.isMarkedEvicted()) {
              throw checkException(e);
           }
        }
        finally {
           delegate = ClosedConnection.CLOSED_CONNECTION;
           poolEntry.recycle(lastAccess);
        }
     }
  }

在connection的close的時候,delegate != ClosedConnection.CLOSED_CONNECTION時會呼叫leakTask.cancel();取消檢測連接泄露的task。

在closeStatements中也會關閉:

   @SuppressWarnings("EmptyTryBlock")
  private synchronized void closeStatements()
 
{
     final int size = openStatements.size();
     if (size > 0) {
        for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
           try (Statement ignored = openStatements.get(i)) {
              // automatic resource cleanup
           }
           catch (SQLException e) {
              LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
                          poolEntry.getPoolName(), delegate);
              leakTask.cancel();
              poolEntry.evict("(exception closing Statements during Connection.close())");
              delegate = ClosedConnection.CLOSED_CONNECTION;
           }
        }
        openStatements.clear();
     }
  }

在checkException中也會關閉

   final SQLException checkException(SQLException sqle)
 
{
     SQLException nse = sqle;
     for (int depth = 0; delegate != ClosedConnection.CLOSED_CONNECTION && nse != null && depth < 10; depth++) {
        final String sqlState = nse.getSQLState();
        if (sqlState != null && sqlState.startsWith("08") || ERROR_STATES.contains(sqlState) || ERROR_CODES.contains(nse.getErrorCode())) {
           // broken connection
           LOGGER.warn("{} - Connection {} marked as broken because of SQLSTATE({}), ErrorCode({})",
                       poolEntry.getPoolName(), delegate, sqlState, nse.getErrorCode(), nse);
           leakTask.cancel();
           poolEntry.evict("(connection is broken)");
           delegate = ClosedConnection.CLOSED_CONNECTION;
        }
        else {
           nse = nse.getNextException();
        }
     }
     return sqle;
  }

在com.zaxxer.hikari.pool.HikariPool的evictConnection中,也會關閉任務

   /**
   * Evict a Connection from the pool.
   *
   * @param connection the Connection to evict (actually a {@link ProxyConnection})
   */

  public void evictConnection(Connection connection)
 
{
     ProxyConnection proxyConnection = (ProxyConnection) connection;
     proxyConnection.cancelLeakTask();
     try {
        softEvictConnection(proxyConnection.getPoolEntry(), "(connection evicted by user)", !connection.isClosed() /* owner */);
     }
     catch (SQLException e) {
        // unreachable in HikariCP, but we're still forced to catch it
     }
  }

小結關閉任務如下圖所示:

測試模擬

我們可以根據本文對於leakDetectionThreshold的分析用測試包里的com.zaxxer.hikari.pool.MiscTest代碼進行適當引數調整模擬連接泄漏情況,測試代碼如下:

/**
* @author Brett Wooldridge
*/

public class MiscTest
{
  @Test
  public void testLogWriter() throws SQLException
 
{
     HikariConfig config = newHikariConfig();
     config.setMinimumIdle(0);
     config.setMaximumPoolSize(4);
     config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
     setConfigUnitTest(true);
     try (HikariDataSource ds = new HikariDataSource(config)) {
        PrintWriter writer = new PrintWriter(System.out);
        ds.setLogWriter(writer);
        assertSame(writer, ds.getLogWriter());
        assertEquals("testLogWriter", config.getPoolName());
     }
     finally
     {
        setConfigUnitTest(false);
     }
  }
  @Test
  public void testInvalidIsolation()
 
{
     try {
        getTransactionIsolation("INVALID");
        fail();
     }
     catch (Exception e) {
        assertTrue(e instanceof IllegalArgumentException);
     }
  }
  @Test
  public void testCreateInstance()
 
{
     try {
        createInstance("invalid", null);
        fail();
     }
     catch (RuntimeException e) {
        assertTrue(e.getCause() instanceof ClassNotFoundException);
     }
  }
  @Test
  public void testLeakDetection() throws Exception
 
{
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try (PrintStream ps = new PrintStream(baos, true)) {
        setSlf4jTargetStream(Class.forName("com.zaxxer.hikari.pool.ProxyLeakTask"), ps);
        setConfigUnitTest(true);
        HikariConfig config = newHikariConfig();
        config.setMinimumIdle(0);
        config.setMaximumPoolSize(4);
        config.setThreadFactory(Executors.defaultThreadFactory());
        config.setMetricRegistry(null);
        config.setLeakDetectionThreshold(TimeUnit.SECONDS.toMillis(4));
        config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
        try (HikariDataSource ds = new HikariDataSource(config)) {
           setSlf4jLogLevel(HikariPool.class, Level.DEBUG);
           getPool(ds).logPoolState();
           try (Connection connection = ds.getConnection()) {
              quietlySleep(SECONDS.toMillis(4));
              connection.close();
              quietlySleep(SECONDS.toMillis(1));
              ps.close();
              String s = new String(baos.toByteArray());
              assertNotNull("Exception string was null", s);
              assertTrue("Expected exception to contain 'Connection leak detection' but contains *" + s + "*", s.contains("Connection leak detection"));
           }
        }
        finally
        {
           setConfigUnitTest(false);
           setSlf4jLogLevel(HikariPool.class, Level.INFO);
        }
     }
  }
}

當代碼執行到了quietlySleep(SECONDS.toMillis(4));時直接按照預期拋異常Apparent connection leak detected。

緊接著在close的過程中執行到了delegate != ClosedConnection.CLOSED_CONNECTION來進行leakTask.cancel()

完整的測試輸出模擬過程如下所示:

Spark/Scala連接池泄漏問題排查

金融中心大資料決策資料組的同學找到反饋了一個問題:

我們在同一個jvm 需要連接多個資料庫時,發現總體上 從連接池borrow 的 connection 多於 歸還的,一段時間後 連接池就會報出
Caused by: java.sql.SQLTransientConnectionException: HikariPool-0 – Connection is not available, request timed out after 30000ms的異常。

用戶使用的spark的場景有點特殊,單機上開的鏈接很小,但是有很多機器都會去連。用戶在一個jvm中就只會併發1個鏈接。

maximumPoolSize: 5
minimumIdle: 2

程式也會出現block的情況,發現是執行mysql時出現的,
mysql show processlist;發現大多停留在query end的情況,程式 thread dump 行程 持有monitor的執行緒。

DBA介入之後發現存在slow sql。

當然,這個問題出了是寫頻繁導致的,一次寫入的量有點大,每一個sql都巨大走的batch,寫入的 records 數在每秒 30-50條,一個record 有70多個欄位。一個解決方式是把 binlog 移到 ssd 盤;還有一個方式是innodb_flush_log_at_trx_commit把這個引數改成0了,估計可能會提高20%~30%。

修複瞭如上一些問題之後,又發現用戶反饋的問題,加了leakDetectionThreshold,得出的結論是存在連接泄漏(從池中借用後連接沒有關閉)。

針對這個問題,我們懷疑的連接池泄漏的點要麼在hikari中,要麼在spark/scala中。採用排除法使用了druid,依然存在這個問題;於是我們就去翻spark這塊的代碼,仔細分析之後定位到了問題:

因為scala map懶加載,一開始mapPartitions都落在一個stage中,我們調整代碼toList之後result.iterator就分在獨立的stage中,連接池泄漏問題就不再存在。

根本原因可以參見《Spark : How to use mapPartition and create/close connection per partition
》:
https://stackoverflow.com/questions/36545579/spark-how-to-use-mappartition-and-create-close-connection-per-partition/36545821#36545821

一開始以為這是一個連接池問題,或者是spark問題,但是實際上通過leakDetectionThreshold的定位,我們得知實際上這是一個scala問題 :)

參考資料

https://segmentfault.com/a/1190000013092894




如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:

01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽

05. 拓展機制 SPI

06. 執行緒池

07. 服務暴露 Export

08. 服務取用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務呼叫 Invoke

13. 呼叫特性 

14. 過濾器 Filter

15. NIO 服務器

16. P2P 服務器

17. HTTP 服務器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing


一共 60 篇++

赞(0)

分享創造快樂