ACE Study 0.1 文档

4. Notification 通知时间派发处理

«  3. I/O Handler的管理   ::   目录   ::   5. 定时器管理  »

本章目录

4. Notification 通知时间派发处理

4.1. 通知的作用

  • 如果反应器 Reactor 的所有者线程的时间多路分离器函数已阻塞并等待IO时间的发生,反应器的通知机制赋予了其他线程将所有者唤醒的功能。

    因为调用 notify 函数会触发notify_handler的读句柄的事件发生。

  • 可以将一个时间处理器的指针和ACE_Reactor_Mask值中的一个传给 notify 方法,这些参数会触发 Reactor 分派对应的处理器的挂钩函数,

    而无需将处理器与IO或者定时器处理事件关联起来。 就是不需要对处理器现行在 Reactor 上进行登记,搞了个特殊化,插队机制。

4.2. ACE_Select_Reactor_Notify 类继承图

ACE_Select_Reactor_T 类中定义的 notify_handler, 见 类主要成员变量, 默认类型为 ACE_Select_Reactor_Notify

ACE_Select_Reactor_Notify

注解

ACE_Select_Reactor_Topen 函数的参数中一个参数用于控制 notify 是否采用 pipe 的参数 disable_notify_pipe, 默认值是 ACE_DISABLE_NOTIFY_PIPE_DEFAULT,宏定义是根据不同平台特性决定该值为0还是1 (需要进一步确认TODO)。

4.3. 通知与Select_Reactor 的分发集成

ace/Select_Reactor_T.cpp dispatch_notification_handlers 函数

template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_notification_handlers
  (ACE_Select_Reactor_Handle_Set &dispatch_set,
   int &number_of_active_handles,
   int &number_of_handlers_dispatched)
{
  // Check to see if the ACE_HANDLE associated with the
  // Select_Reactor's notify hook is enabled.  If so, it means that
  // one or more other threads are trying to update the
  // ACE_Select_Reactor_T's internal tables or the notify pipe is
  // enabled.  We'll handle all these threads and notifications, and
  // then break out to continue the event loop.
  int const n =
    this->notify_handler_->dispatch_notifications (number_of_active_handles,
                                                   dispatch_set.rd_mask_);

  if (n == -1)
    return -1;
  else
    {
      number_of_handlers_dispatched += n;
      number_of_active_handles -= n;
    }

  // Same as dispatch_timer_handlers
  // No need to do anything with the state changed. That is because
  // unbind already handles the case where someone unregister some
  // kind of handle and unbind it. (::unbind calls the function
  // state_changed ()  to reflect ant change with that)
  //   return this->state_changed_ ? -1 : 0;
  return 0;
}

4.3.1. dispatch_notifications 函数

ace/Select_Reactor_Base.cpp dispatch_notifications 函数

int
ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
                                                   ACE_Handle_Set &rd_mask)
{
  ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");

  ACE_HANDLE const read_handle =
    this->notification_pipe_.read_handle ();

  if (read_handle != ACE_INVALID_HANDLE
      && rd_mask.is_set (read_handle))
    {
      --number_of_active_handles;
      rd_mask.clr_bit (read_handle);
      return this->handle_input (read_handle);
    }
  else
    return 0;
}

4.3.2. handle_input 函数

ace/Select_Reactor_Base.cpp handle_input 函数

int
ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
{
  ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
  // Precondition: this->select_reactor_.token_.current_owner () ==
  // ACE_Thread::self ();

  int number_dispatched = 0;
  int result = 0;
  ACE_Notification_Buffer buffer;

  // If there is only one buffer in the pipe, this will loop and call
  // read_notify_pipe() twice.  The first time will read the buffer, and
  // the second will read the fact that the pipe is empty.
  while ((result = this->read_notify_pipe (handle, buffer)) > 0)
    {
      // Dispatch the buffer
      // NOTE: We count only if we made any dispatches ie. upcalls.
      if (this->dispatch_notify (buffer) > 0)
        ++number_dispatched;

      // Bail out if we've reached the <notify_threshold_>.  Note that
      // by default <notify_threshold_> is -1, so we'll loop until all
      // the notifications in the pipe have been dispatched.
      if (number_dispatched == this->max_notify_iterations_)
        break;
    }

  // Reassign number_dispatched to -1 if things have gone seriously
  // wrong.
  if (result < 0)
    number_dispatched = -1;

  // Enqueue ourselves into the list of waiting threads.  When we
  // reacquire the token we'll be off and running again with ownership
  // of the token.  The postcondition of this call is that
  // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
  this->select_reactor_->renew ();
  return number_dispatched;
}

4.3.3. dispatch_notify 函数

ace/Select_Reactor_Base.cpp dispatch_notify 函数

int
ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
{
  int result = 0;

#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
  // Dispatch one message from the notify queue, and put another in
  // the pipe if one is available.  Remember, the idea is to keep
  // exactly one message in the pipe at a time.

  bool more_messages_queued = false;
  ACE_Notification_Buffer next;

  result = notification_queue_.pop_next_notification(buffer,
                                                     more_messages_queued,
                                                     next);

  if (result == 0 || result == -1)
    {
      return result;
    }

  if(more_messages_queued)
    {
      (void) ACE::send(this->notification_pipe_.write_handle(),
            (char *)&next, sizeof(ACE_Notification_Buffer));
    }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */

  // If eh == 0 then another thread is unblocking the
  // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
  // internal structures.  Otherwise, we need to dispatch the
  // appropriate handle_* method on the <ACE_Event_Handler> pointer
  // we've been passed.
  if (buffer.eh_ != 0)
    {
      ACE_Event_Handler *event_handler = buffer.eh_;

      bool const requires_reference_counting =
        event_handler->reference_counting_policy ().value () ==
        ACE_Event_Handler::Reference_Counting_Policy::ENABLED;

      switch (buffer.mask_)
        {
        case ACE_Event_Handler::READ_MASK:
        case ACE_Event_Handler::ACCEPT_MASK:
          result = event_handler->handle_input (ACE_INVALID_HANDLE);
          break;
        case ACE_Event_Handler::WRITE_MASK:
          result = event_handler->handle_output (ACE_INVALID_HANDLE);
          break;
        case ACE_Event_Handler::EXCEPT_MASK:
          result = event_handler->handle_exception (ACE_INVALID_HANDLE);
          break;
        case ACE_Event_Handler::QOS_MASK:
          result = event_handler->handle_qos (ACE_INVALID_HANDLE);
          break;
        case ACE_Event_Handler::GROUP_QOS_MASK:
          result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
          break;
        default:
          // Should we bail out if we get an invalid mask?
          ACELIB_ERROR ((LM_ERROR,
                      ACE_TEXT ("invalid mask = %d\n"),
                      buffer.mask_));
        }

      if (result == -1)
        event_handler->handle_close (ACE_INVALID_HANDLE,
                                     ACE_Event_Handler::EXCEPT_MASK);

      if (requires_reference_counting)
        {
          event_handler->remove_reference ();
        }
    }

  return 1;
}

4.3.4. open函数

ace/Select_Reactor_Base.cpp open 函数

int
ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
                                 ACE_Timer_Queue *,
                                 int disable_notify_pipe)
{
  ACE_TRACE ("ACE_Select_Reactor_Notify::open");

  if (disable_notify_pipe == 0)
    {
      this->select_reactor_ = dynamic_cast<ACE_Select_Reactor_Impl *> (r);

      if (select_reactor_ == 0)
        {
          errno = EINVAL;
          return -1;
        }

      if (this->notification_pipe_.open () == -1)
        return -1;
#if defined (F_SETFD)
      ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
      ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
#endif /* F_SETFD */

#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
      if (notification_queue_.open() == -1)
        {
          return -1;
        }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */

      // There seems to be a Win32 bug with this...  Set this into
      // non-blocking mode.
      if (ACE::set_flags (this->notification_pipe_.read_handle (),
                          ACE_NONBLOCK) == -1)
        return -1;
      else
        return this->select_reactor_->register_handler
          (this->notification_pipe_.read_handle (),
           this,
           ACE_Event_Handler::READ_MASK);
    }
  else
    {
      this->select_reactor_ = 0;
      return 0;
    }
}

其中 notification_pipe_ 类型为 ACE_Pipe, 38-41行 则注册了 pipe 的 handler 的 READ_MASK 至 Reactor 中,及时接受相关通知消息。

4.4. 通知的调用接口

4.4.1. notify 函数

用于进行通知的调用方法。

ace/Select_Reactor_Base.cpp notify 函数

int
ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
                                   ACE_Reactor_Mask mask,
                                   ACE_Time_Value *timeout)
{
  ACE_TRACE ("ACE_Select_Reactor_Notify::notify");

  // Just consider this method a "no-op" if there's no
  // <ACE_Select_Reactor> configured.
  if (this->select_reactor_ == 0)
    return 0;

  ACE_Event_Handler_var safe_handler (event_handler);

  if (event_handler)
    {
      event_handler->add_reference ();
    }

  ACE_Notification_Buffer buffer (event_handler, mask);

#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
  int const notification_required =
    notification_queue_.push_new_notification(buffer);

  if (notification_required == -1)
    {
      return -1;
    }

  if (notification_required == 0)
    {
      // No failures, the handler is now owned by the notification queue
      safe_handler.release ();

      return 0;
    }
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */

  ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
                               (char *) &buffer,
                               sizeof buffer,
                               timeout);
  if (n == -1)
    {
      return -1;
    }

  // No failures.
  safe_handler.release ();

  return 0;
}

4.5. 通知的注意事项

避免死锁

  1. 缺省机制下,反应器的通知机制是通过有界缓存来实现的,而 notify() 使用柱塞式发送调用将通知插入队列中。因此如果缓存区已经满,某个时间处理器的 handle_*() 函数又调用了 notify() ,就可能发生死锁。 避免方式可采用如下:
  • 使用 notify() 函数时候指定超时时间
  • 对应用进行设计,保证生成 notify() 调用的速度不会快于处理器处理的速度,这是最好的保护方式。
  1. 扩大 ACE_Select_Reactor 的通知机制
  • 将通知机制的有界缓存替换成任意扩大的用户队列,通过 $ACE_ROOT/ace/config.h 文件中增加 #define ACE_HAS_REACTOR_NOTIFICATION_QUEUE 宏定义,然后重新编译,缺省情况下该特性没有被启用,因为高性能系统或者嵌入式系统中难以接受。
  • 使用该用户空间可扩大队列,允许使用 ACE_Reactor::purge_pending_notifications() 方法扫描队列,并移除符合要求的时间处理器,避免已经销毁的时间处理器仍然挂在 notify 的队列中。

«  3. I/O Handler的管理   ::   目录   ::   5. 定时器管理  »