上QQ阅读APP看书,第一时间看更新
3.3 创建metadataCleaner
SparkContext为了保持对所有持久化的RDD的跟踪,使用类型是TimeStamped-WeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码如下。
private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
我们仔细看看MetadataCleaner的实现,见代码清单3-14。
代码清单3-14 MetadataCleaner的实现
private[spark] class MetadataCleaner( cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit, conf: SparkConf) extends Logging { val name = cleanerType.toString private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType) private val periodSeconds = math.max(10, delaySeconds / 10) private val timer = new Timer(name + " cleanup timer", true) private val task = new TimerTask { override def run() { try { cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) logInfo("Ran metadata cleaner for " + name) } catch { case e: Exception => logError("Error running cleanup task for " + name, e) } } } if (delaySeconds > 0) { timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000) } def cancel() { timer.cancel() } }
从MetadataCleaner的实现可以看出其实质是一个用TimerTask实现的定时器,不断调用cleanupFunc:(Long)=>Unit这样的函数参数。构造metadataCleaner时的函数参数是cleanup,用于清理persistentRdds中的过期内容,代码如下。
private[spark] def cleanup(cleanupTime: Long) { persistentRdds.clearOldValues(cleanupTime) }