Skip to content

Latest commit

 

History

History
149 lines (122 loc) · 4.26 KB

知识点小记.md

File metadata and controls

149 lines (122 loc) · 4.26 KB

知识点小记

spark方向

Eventloop

spark中经常能看到eventloop这个概念,例如

private[spark] abstract class DelayEventLoop[E](name: String) extends Logging {

  private val eventQueue: DelayQueue[DelayEvent] = new DelayQueue[DelayEvent]()

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take().e
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }
  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

  def stop(): Unit = {
    if (stopped.compareAndSet(false, true)) {
      eventThread.interrupt()
      var onStopCalled = false
      try {
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
        onStop()
      } catch {
        case ie: InterruptedException =>
          Thread.currentThread().interrupt()
          if (!onStopCalled) {
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
            onStop()
          }
      }
    } else {
      // Keep quiet to allow calling `stop` multiple times.
    }
  }

  /**
   * Put the event into the event queue. The event thread will process it later.
   */
  def post(event: E, delayMs: Long): Unit = {
    eventQueue.put(new DelayEvent(event, delayMs))
  }

  /**
   * Return if the event thread has already been started but not yet stopped.
   */
  def isActive: Boolean = eventThread.isAlive

  /**
   * Invoked when `start()` is called but before the event thread starts.
   */
  protected def onStart(): Unit = {}

  /**
   * Invoked when `stop()` is called and the event thread exits.
   */
  protected def onStop(): Unit = {}

  /**
   * Invoked in the event thread when polling events from the event queue.
   *
   * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
   * and cannot process events in time. If you want to call some blocking actions, run them in
   * another thread.
   */
  protected def onReceive(event: E): Unit

  /**
   * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
   * will be ignored.
   */
  protected def onError(e: Throwable): Unit

  private class DelayEvent(val e: E, delayMs: Long) extends Delayed {
    private val time = System.currentTimeMillis() + delayMs

    override def getDelay(unit: TimeUnit): Long = {
      time - System.currentTimeMillis()
    }

    override def compareTo(o: Delayed): Int = {
      val otherTime = o.asInstanceOf[DelayEvent].time
      if (time < otherTime) {
        -1
      } else if (time > otherTime) {
        1
      } else {
        0
      }
    }
  }

}

基本模型:

  • 内部持有一个延迟队列eventQueue

  • 一个thread类型的成员eventThread,它是守护线程,主线程结束时它也结束

  • stopped,AtomicBoolean类型,调用stop方法后会通过cas把它设为true

  • start方法,执行onstart方法,start eventThread

  • stop方法:通过cas设置stopped,打断eventThread,等到eventThread结束之后,调用onStop方法,这样是确保在eventThread结束之后onStop调用

  • eventThread循环,先检查是否被stop了,没有的话就从eventQueue中取出一个时间,执行onreceive方法处理事件,等到被stop里的interrupt打断之后就退出

  • Onstart[eventThread开始前调用]、OnStop[eventThread结束后调用]、Onreceive[处理事件]

java方向

java工具类

时间工具类

https://segmentfault.com/a/1190000040371876