ACE Study 0.1 文档

5. 定时器管理

«  4. Notification 通知时间派发处理   ::   目录   ::   6. 信号量的管理  »

本章目录

5. 定时器管理

5.1. 定时器类继承图

_images/a03884.png

其中 ACE_Abstract_Timer_Queue 类为所有定时器队列的基类,提供一个统一的接口,而不管加锁策略、回调机制、内部实现。该类的主要目的是为Reactor实现了一个统一的接口类,从而是Reactor无须了解定时器队列的内部实现机制。 ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY> 为定时器队列提供了公共的接口,从该类派生的类包括 ACE_Timer_Hash_T, ACE_Timer_Heap_T , ACE_Timer_List_T, ACE_Timer_Wheel_T 4中类型的时间队列的实现。本例中主要分析 ACE_Timer_Heap_T 类的主要数据结构和实现机制。

ACE_Timer_Queue_T 类定义原型

1
2
template <class TYPE,class FUNCTOR,class ACE_LOCK, typename TIME_POLICY=ACE_Default_Time_Policy>
        class ACE_Timer_Queue_T : public ACE_Timer_Queue_Upcall_Base<TYPE,FUNCTOR>

5.3. 定时器的数据结构

ACE_Timer_Queue_T 类成员变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  /// Synchronization variable for ACE_Timer_Queue.
  /// @note The right name would be lock_, but HP/C++ will choke on that!
  ACE_LOCK mutex_;

  /// Class that implements a free list
  ACE_Free_List<ACE_Timer_Node_T<TYPE> > *free_list_;

  /// The policy to return the current time of day
  TIME_POLICY time_policy_;

  /// Flag to delete only if the class created the <free_list_>
  bool const delete_free_list_;

private:

  /// Returned by <calculate_timeout>.
  ACE_Time_Value timeout_;

  /// Adjusts for timer skew in various clocks.
  ACE_Time_Value timer_skew_;

5.3.1. Heap Timer基础知识

min-heap,就是 child node value 一定小于 parent node value 的 binary-tree。因此,获取 min value,只需要 pop root node 即可。

_images/heap_timer.jpg

在ACE_Time_Heap类中,每个节点结构如下:

1
2
3
4
5
6
7
ACE_Event_Handler * type_;            // 时间处理的回调类
const void *act_;                     // 异步完成的token
ACE_Time_Value timer_value_;          // 要超时的时间值
ACE_Time_Value interval_;             // 周期定时器的周期值
ACE_Timer_Node_T<TYPE> *prev_;        // 前一个节点
ACE_Timer_Node_T<TYPE> *next_;        // 后一个节点
long timer_id_;                       // 定时器的id,主要用于取消

5.4. 定时器与Select_Reactor的分发集成

ace/Select_Reactor_T.cpp dispatch 函数中调用 dispatch_timer_handlers 函数处理定时器队列。

Select_Reactor_T.cpp dispatch_timer_handlers 函数

1
2
3
4
5
6
7
8
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_timer_handlers
  (int &number_of_handlers_dispatched)
{
  number_of_handlers_dispatched += this->timer_queue_->expire ();

  return 0;
}

5.4.1. 定时器队列的 expire 函数

Timer_Queue_T.inl expire 函数

1
2
3
4
5
6
7
8
template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY> ACE_INLINE int
ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::expire (void)
{
  if (!this->is_empty ())
    return this->expire (this->gettimeofday_static () + timer_skew_);
  else
    return 0;
}

Timer_Queue_T.cpp expire 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Run the <handle_timeout> method for all Timers whose values are <=
// <cur_time>.
template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY> int
ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::expire (const ACE_Time_Value &cur_time)
{
  ACE_TRACE ("ACE_Timer_Queue_T::expire");
  ACE_MT (ACE_GUARD_RETURN (ACE_LOCK, ace_mon, this->mutex_, -1));

  // Keep looping while there are timers remaining and the earliest
  // timer is <= the <cur_time> passed in to the method.

  if (this->is_empty ())
    return 0;

  int number_of_timers_expired = 0;
  int result = 0;

  ACE_Timer_Node_Dispatch_Info_T<TYPE> info;

  while ((result = this->dispatch_info_i (cur_time, info)) != 0)
    {
      ACE_MT (ACE_Reverse_Lock<ACE_LOCK> rev_lk(this->mutex_));
      ACE_MT (ACE_GUARD_RETURN (ACE_Reverse_Lock<ACE_LOCK>, rmon, rev_lk, -1));

      const void *upcall_act = 0;

      this->preinvoke (info, cur_time, upcall_act);

      this->upcall (info, cur_time);

      this->postinvoke (info, cur_time, upcall_act);

      ++number_of_timers_expired;

    }

  ACE_UNUSED_ARG (result);
  return number_of_timers_expired;
}

其中 29行 为调用该超时节点的 handle_timeout 函数的包装。

5.4.2. 定时器队列的 dispatch_info_i 函数

Timer_Queue_T.cpp dispatch_info_i 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY> int
ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::dispatch_info_i (const ACE_Time_Value &cur_time,
                                                             ACE_Timer_Node_Dispatch_Info_T<TYPE> &info)
{
  ACE_TRACE ("ACE_Timer_Queue_T::dispatch_info_i");

  if (this->is_empty ())
    return 0;

  ACE_Timer_Node_T<TYPE> *expired = 0;

  if (this->earliest_time () <= cur_time)
    {
      expired = this->remove_first ();

      // Get the dispatch info
      expired->get_dispatch_info (info);

      // Check if this is an interval timer.
      if (expired->get_interval () > ACE_Time_Value::zero)
        {
          // Make sure that we skip past values that have already
          // "expired".
          this->recompute_next_abs_interval_time (expired, cur_time);

          // Since this is an interval timer, we need to reschedule
          // it.
          this->reschedule (expired);
        }
      else
        {
          // Call the factory method to free up the node.
          this->free_node (expired);
        }

      return 1;
    }

  return 0;
}

5.4.3. 定时器队列的 upcall 函数

Timer_Queue_T.inl upcall 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY> ACE_INLINE void
ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::upcall (ACE_Timer_Node_Dispatch_Info_T<TYPE> &info,
                                                    const ACE_Time_Value &cur_time)
{
  this->upcall_functor ().timeout (*this,
                                   info.type_,
                                   info.act_,
                                   info.recurring_timer_,
                                   cur_time);
}

其中 14行获取最小超时时间节点,行17获取超时节点的相关信息,20-28 行为处理周期定时器。

5.5. 定时器注册

5.5.1. schedule 函数

ACE_Timer_Queue_T schedule 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY> long
ACE_Timer_Queue_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::schedule (const TYPE &type,
                                                      const void *act,
                                                      const ACE_Time_Value &future_time,
                                                      const ACE_Time_Value &interval)
{
  ACE_MT (ACE_GUARD_RETURN (ACE_LOCK, ace_mon, this->mutex_, -1));

  // Schedule the timer.
  long const result =
    this->schedule_i (type,
                      act,
                      future_time,
                      interval);

  // Return on failure.
  if (result == -1)
    return result;

  // Inform upcall functor of successful registration.
  this->upcall_functor ().registration (*this,
                                        type,
                                        act);

  // Return result;
  return result;
}

5.5.2. schedule_i 函数

schedule_i 函数在其继承类实现,本例以 ACE_Timer_Heap_T 类为例分析。

ACE_Timer_Heap 的定义

1
2
3
4
typedef ACE_Timer_Heap_T<ACE_Event_Handler *,
                         ACE_Event_Handler_Handle_Timeout_Upcall,
                         ACE_SYNCH_RECURSIVE_MUTEX>
        ACE_Timer_Heap;

Timer_Heap_T.cpp schedule_i 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY>
long
ACE_Timer_Heap_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::schedule_i (
  const TYPE &type,
  const void *act,
  const ACE_Time_Value &future_time,
  const ACE_Time_Value &interval)
{
  ACE_TRACE ("ACE_Timer_Heap_T::schedule_i");

  if ((this->cur_size_ + this->cur_limbo_) < this->max_size_)
    {
      // Obtain the next unique sequence number.
      long const timer_id = this->timer_id ();

      // Obtain the memory to the new node.
      ACE_Timer_Node_T<TYPE> *temp = 0;

      ACE_ALLOCATOR_RETURN (temp,
                            this->alloc_node (),
                            -1);
      temp->set (type,
                 act,
                 future_time,
                 interval,
                 0,
                 timer_id);

      this->insert (temp);
      return timer_id;
    }
  else
    return -1;
}

分配的节点类型为: ACE_Timer_Node_T<TYPE>,然后设置相关信息,调用 insert 函数插入。

Timer_Queue_Iterator.h ACE_Timer_Node_T 类成员变量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
  /// Type of object stored in the Queue
  TYPE type_;

  /// Asynchronous completion token associated with the timer.
  const void *act_;

  /// Time until the timer expires.
  ACE_Time_Value timer_value_;

  /// If this is a periodic timer this holds the time until the next
  /// timeout.
  ACE_Time_Value interval_;

  /// Pointer to previous timer.
  ACE_Timer_Node_T<TYPE> *prev_;

  /// Pointer to next timer.
  ACE_Timer_Node_T<TYPE> *next_;

  /// Id of this timer (used to cancel timers before they expire).
  long timer_id_;

5.5.3. insert 函数

ACE_Timer_Heap_T.cpp insert 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY>
void
ACE_Timer_Heap_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::insert (
  ACE_Timer_Node_T<TYPE> *new_node)
{
  if (this->cur_size_ + this->cur_limbo_ + 2 >= this->max_size_)
    this->grow_heap ();

  this->reheap_up (new_node,
                   this->cur_size_,
                   ACE_HEAP_PARENT (this->cur_size_));
  this->cur_size_++;
}

5.5.4. reheap_up 函数

ACE_Timer_Heap_T.cpp reheap_up 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY>
void
ACE_Timer_Heap_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::reheap_up (
  ACE_Timer_Node_T<TYPE> *moved_node,
  size_t slot,
  size_t parent)
{
  // Restore the heap property after an insertion.

  while (slot > 0)
    {
      // If the parent node is greater than the <moved_node> we need
      // to copy it down.
      if (moved_node->get_timer_value ()
          < this->heap_[parent]->get_timer_value ())
        {
          this->copy (slot, this->heap_[parent]);
          slot = parent;
          parent = ACE_HEAP_PARENT (slot);
        }
      else
        break;
    }

  // Insert the new node into its proper resting place in the heap and
  // update the corresponding slot in the parallel <timer_ids> array.
  this->copy (slot,
              moved_node);
}

Timer_Heap_T.h heap_ 变量定义

1
2
3
4
5
6
7
  /**
   * Current contents of the Heap, which is organized as a "heap" of
   * ACE_Timer_Node *'s.  In this context, a heap is a "partially
   * ordered, almost complete" binary tree, which is stored in an
   * array.
   */
  ACE_Timer_Node_T<TYPE> **heap_;

5.5.5. upcall_functor registration函数

Timer_Queue_T.h upcall_functor ().registration 成员变量

1
  FUNCTOR *upcall_functor_;

根据 schedule_i 函数 章节可知 FUNCTOR 定位对象 ACE_Event_Handler_Handle_Timeout_Upcall

Timer_Queue_T.h upcall_functor ().registration 成员变量

1
2
3
4
5
6
7
8
9
ACE_INLINE int
ACE_Event_Handler_Handle_Timeout_Upcall::
registration (ACE_Timer_Queue &,
              ACE_Event_Handler *event_handler,
              const void *)
{
  event_handler->add_reference ();
  return 0;
}

upcall_functor ().registration 仅仅对 ACE_Event_Handler *event_handler 对象维护了引用计数的功能,无其他具体功能实现。

5.6. 定时器取消

ACE_Timer_Heap_T.cpp cancel 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Locate and remove the single timer with a value of @a timer_id from
// the timer queue.

template <class TYPE, class FUNCTOR, class ACE_LOCK, typename TIME_POLICY>
int
ACE_Timer_Heap_T<TYPE, FUNCTOR, ACE_LOCK, TIME_POLICY>::cancel (long timer_id,
                                                   const void **act,
                                                   int dont_call)
{
  ACE_TRACE ("ACE_Timer_Heap_T::cancel");
  ACE_MT (ACE_GUARD_RETURN (ACE_LOCK, ace_mon, this->mutex_, -1));

  // Locate the ACE_Timer_Node that corresponds to the timer_id.

  // Check to see if the timer_id is out of range
  if (timer_id < 0
      || (size_t) timer_id > this->max_size_)
    return 0;

  ssize_t timer_node_slot = this->timer_ids_[timer_id];

  // Check to see if timer_id is still valid.
  if (timer_node_slot < 0)
    return 0;

  if (timer_id != this->heap_[timer_node_slot]->get_timer_id ())
    {
      ACE_ASSERT (timer_id == this->heap_[timer_node_slot]->get_timer_id ());
      return 0;
    }
  else
    {
      ACE_Timer_Node_T<TYPE> *temp =
        this->remove (timer_node_slot);

      // Call the close hooks.
      int cookie = 0;

      // cancel_type() called once per <type>.
      this->upcall_functor ().cancel_type (*this,
                                           temp->get_type (),
                                           dont_call,
                                           cookie);

      // cancel_timer() called once per <timer>.
      this->upcall_functor ().cancel_timer (*this,
                                            temp->get_type (),
                                            dont_call,
                                            cookie);

      if (act != 0)
        *act = temp->get_act ();

      this->free_node (temp);
      return 1;
    }
}

5.7. 定时器队列注意事项

5.7.1. ACE时间源

  1. ACE_OS::gettimeofday() 返回 ACE_Timer_Value 的静态方法。
  2. ACE_High_Res_Timer::gettimeofday, 返回“OS特有的高精度定时器的值”的静态方法,并且会将该值转化成 ACE_Timer_Value 单位。该定时器常常以系统CPU启动的“嘀嗒”数,而不是以实际的挂钟时间为基础。可以避免系统时间调整对于定时器的影响。

5.7.2. 实时线程定时器

对于实时性要求高的定时器,可以采用 ACE_Thread_Timer_Queue_Adapter 来实现,该类启动一个单独线程来进行定时器触发。具体使用样例可以参见:ace\examples\timer_queue\Thread_Timer_Queue_Test.cpp

5.8. 各种定时器队列对比

名称 描述
ACE_Timer_Heap 数组中实现部分有序,几乎完整的二叉树。 一般和最坏O(lgn)。
ACE_Timer_Wheel 使用含有循环缓冲区的timing wheel。 一般情况下以O(1)时间调度,取消和分配定时器。最坏需要O(n)。
ACE_Timer_Hash 使用哈希表来管理队列。 一般情况下以O(1)时间调度,取消和分配定时器。最坏需要O(n)。
ACE_Timer_List 绝对定时器链表,并按照递增的最后期限排序。 一般情况下和最坏情况下性能都是 O(n),但是使用内存最少。

«  4. Notification 通知时间派发处理   ::   目录   ::   6. 信号量的管理  »