spark中经常能看到eventloop这个概念,例如
private[spark] abstract class DelayEventLoop[E](name: String) extends Logging {
private val eventQueue: DelayQueue[DelayEvent] = new DelayQueue[DelayEvent]()
private val stopped = new AtomicBoolean(false)
private val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take().e
try {
onReceive(event)
} catch {
case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
def start(): Unit = {
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped")
}
// Call onStart before starting the event thread to make sure it happens before onReceive
onStart()
eventThread.start()
}
def stop(): Unit = {
if (stopped.compareAndSet(false, true)) {
eventThread.interrupt()
var onStopCalled = false
try {
eventThread.join()
// Call onStop after the event thread exits to make sure onReceive happens before onStop
onStopCalled = true
onStop()
} catch {
case ie: InterruptedException =>
Thread.currentThread().interrupt()
if (!onStopCalled) {
// ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
// it's already called.
onStop()
}
}
} else {
// Keep quiet to allow calling `stop` multiple times.
}
}
/**
* Put the event into the event queue. The event thread will process it later.
*/
def post(event: E, delayMs: Long): Unit = {
eventQueue.put(new DelayEvent(event, delayMs))
}
/**
* Return if the event thread has already been started but not yet stopped.
*/
def isActive: Boolean = eventThread.isAlive
/**
* Invoked when `start()` is called but before the event thread starts.
*/
protected def onStart(): Unit = {}
/**
* Invoked when `stop()` is called and the event thread exits.
*/
protected def onStop(): Unit = {}
/**
* Invoked in the event thread when polling events from the event queue.
*
* Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
* and cannot process events in time. If you want to call some blocking actions, run them in
* another thread.
*/
protected def onReceive(event: E): Unit
/**
* Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
* will be ignored.
*/
protected def onError(e: Throwable): Unit
private class DelayEvent(val e: E, delayMs: Long) extends Delayed {
private val time = System.currentTimeMillis() + delayMs
override def getDelay(unit: TimeUnit): Long = {
time - System.currentTimeMillis()
}
override def compareTo(o: Delayed): Int = {
val otherTime = o.asInstanceOf[DelayEvent].time
if (time < otherTime) {
-1
} else if (time > otherTime) {
1
} else {
0
}
}
}
}
基本模型:
-
内部持有一个延迟队列eventQueue
-
一个thread类型的成员eventThread,它是守护线程,主线程结束时它也结束
-
stopped,AtomicBoolean类型,调用stop方法后会通过cas把它设为true
-
start方法,执行onstart方法,start eventThread
-
stop方法:通过cas设置stopped,打断eventThread,等到eventThread结束之后,调用onStop方法,这样是确保在eventThread结束之后onStop调用
-
eventThread循环,先检查是否被stop了,没有的话就从eventQueue中取出一个时间,执行onreceive方法处理事件,等到被stop里的interrupt打断之后就退出
-
Onstart[eventThread开始前调用]、OnStop[eventThread结束后调用]、Onreceive[处理事件]
时间工具类