本章目录
4. Notification 通知时间派发处理¶
4.1. 通知的作用¶
- 如果反应器 Reactor 的所有者线程的时间多路分离器函数已阻塞并等待IO时间的发生,反应器的通知机制赋予了其他线程将所有者唤醒的功能。
因为调用 notify 函数会触发notify_handler的读句柄的事件发生。
- 可以将一个时间处理器的指针和ACE_Reactor_Mask值中的一个传给 notify 方法,这些参数会触发 Reactor 分派对应的处理器的挂钩函数,
而无需将处理器与IO或者定时器处理事件关联起来。 就是不需要对处理器现行在 Reactor 上进行登记,搞了个特殊化,插队机制。
4.2. ACE_Select_Reactor_Notify 类继承图¶
ACE_Select_Reactor_T 类中定义的 notify_handler, 见 类主要成员变量, 默认类型为 ACE_Select_Reactor_Notify。
注解
在 ACE_Select_Reactor_T 的 open 函数的参数中一个参数用于控制 notify 是否采用 pipe 的参数 disable_notify_pipe, 默认值是 ACE_DISABLE_NOTIFY_PIPE_DEFAULT,宏定义是根据不同平台特性决定该值为0还是1 (需要进一步确认TODO)。
4.3. 通知与Select_Reactor 的分发集成¶
ace/Select_Reactor_T.cpp dispatch_notification_handlers 函数
template <class ACE_SELECT_REACTOR_TOKEN> int
ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch_notification_handlers
(ACE_Select_Reactor_Handle_Set &dispatch_set,
int &number_of_active_handles,
int &number_of_handlers_dispatched)
{
// Check to see if the ACE_HANDLE associated with the
// Select_Reactor's notify hook is enabled. If so, it means that
// one or more other threads are trying to update the
// ACE_Select_Reactor_T's internal tables or the notify pipe is
// enabled. We'll handle all these threads and notifications, and
// then break out to continue the event loop.
int const n =
this->notify_handler_->dispatch_notifications (number_of_active_handles,
dispatch_set.rd_mask_);
if (n == -1)
return -1;
else
{
number_of_handlers_dispatched += n;
number_of_active_handles -= n;
}
// Same as dispatch_timer_handlers
// No need to do anything with the state changed. That is because
// unbind already handles the case where someone unregister some
// kind of handle and unbind it. (::unbind calls the function
// state_changed () to reflect ant change with that)
// return this->state_changed_ ? -1 : 0;
return 0;
}
4.3.1. dispatch_notifications 函数¶
ace/Select_Reactor_Base.cpp dispatch_notifications 函数
int
ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
ACE_Handle_Set &rd_mask)
{
ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
ACE_HANDLE const read_handle =
this->notification_pipe_.read_handle ();
if (read_handle != ACE_INVALID_HANDLE
&& rd_mask.is_set (read_handle))
{
--number_of_active_handles;
rd_mask.clr_bit (read_handle);
return this->handle_input (read_handle);
}
else
return 0;
}
4.3.2. handle_input 函数¶
ace/Select_Reactor_Base.cpp handle_input 函数
int
ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
{
ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
// Precondition: this->select_reactor_.token_.current_owner () ==
// ACE_Thread::self ();
int number_dispatched = 0;
int result = 0;
ACE_Notification_Buffer buffer;
// If there is only one buffer in the pipe, this will loop and call
// read_notify_pipe() twice. The first time will read the buffer, and
// the second will read the fact that the pipe is empty.
while ((result = this->read_notify_pipe (handle, buffer)) > 0)
{
// Dispatch the buffer
// NOTE: We count only if we made any dispatches ie. upcalls.
if (this->dispatch_notify (buffer) > 0)
++number_dispatched;
// Bail out if we've reached the <notify_threshold_>. Note that
// by default <notify_threshold_> is -1, so we'll loop until all
// the notifications in the pipe have been dispatched.
if (number_dispatched == this->max_notify_iterations_)
break;
}
// Reassign number_dispatched to -1 if things have gone seriously
// wrong.
if (result < 0)
number_dispatched = -1;
// Enqueue ourselves into the list of waiting threads. When we
// reacquire the token we'll be off and running again with ownership
// of the token. The postcondition of this call is that
// <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
this->select_reactor_->renew ();
return number_dispatched;
}
4.3.3. dispatch_notify 函数¶
ace/Select_Reactor_Base.cpp dispatch_notify 函数
int
ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
{
int result = 0;
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
// Dispatch one message from the notify queue, and put another in
// the pipe if one is available. Remember, the idea is to keep
// exactly one message in the pipe at a time.
bool more_messages_queued = false;
ACE_Notification_Buffer next;
result = notification_queue_.pop_next_notification(buffer,
more_messages_queued,
next);
if (result == 0 || result == -1)
{
return result;
}
if(more_messages_queued)
{
(void) ACE::send(this->notification_pipe_.write_handle(),
(char *)&next, sizeof(ACE_Notification_Buffer));
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// If eh == 0 then another thread is unblocking the
// <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
// internal structures. Otherwise, we need to dispatch the
// appropriate handle_* method on the <ACE_Event_Handler> pointer
// we've been passed.
if (buffer.eh_ != 0)
{
ACE_Event_Handler *event_handler = buffer.eh_;
bool const requires_reference_counting =
event_handler->reference_counting_policy ().value () ==
ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
switch (buffer.mask_)
{
case ACE_Event_Handler::READ_MASK:
case ACE_Event_Handler::ACCEPT_MASK:
result = event_handler->handle_input (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::WRITE_MASK:
result = event_handler->handle_output (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::EXCEPT_MASK:
result = event_handler->handle_exception (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::QOS_MASK:
result = event_handler->handle_qos (ACE_INVALID_HANDLE);
break;
case ACE_Event_Handler::GROUP_QOS_MASK:
result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
break;
default:
// Should we bail out if we get an invalid mask?
ACELIB_ERROR ((LM_ERROR,
ACE_TEXT ("invalid mask = %d\n"),
buffer.mask_));
}
if (result == -1)
event_handler->handle_close (ACE_INVALID_HANDLE,
ACE_Event_Handler::EXCEPT_MASK);
if (requires_reference_counting)
{
event_handler->remove_reference ();
}
}
return 1;
}
4.3.4. open函数¶
ace/Select_Reactor_Base.cpp open 函数
int
ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
ACE_Timer_Queue *,
int disable_notify_pipe)
{
ACE_TRACE ("ACE_Select_Reactor_Notify::open");
if (disable_notify_pipe == 0)
{
this->select_reactor_ = dynamic_cast<ACE_Select_Reactor_Impl *> (r);
if (select_reactor_ == 0)
{
errno = EINVAL;
return -1;
}
if (this->notification_pipe_.open () == -1)
return -1;
#if defined (F_SETFD)
ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1);
ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1);
#endif /* F_SETFD */
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
if (notification_queue_.open() == -1)
{
return -1;
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
// There seems to be a Win32 bug with this... Set this into
// non-blocking mode.
if (ACE::set_flags (this->notification_pipe_.read_handle (),
ACE_NONBLOCK) == -1)
return -1;
else
return this->select_reactor_->register_handler
(this->notification_pipe_.read_handle (),
this,
ACE_Event_Handler::READ_MASK);
}
else
{
this->select_reactor_ = 0;
return 0;
}
}
其中 notification_pipe_ 类型为 ACE_Pipe, 38-41行 则注册了 pipe 的 handler 的 READ_MASK 至 Reactor 中,及时接受相关通知消息。
4.4. 通知的调用接口¶
4.4.1. notify 函数¶
用于进行通知的调用方法。
ace/Select_Reactor_Base.cpp notify 函数
int
ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
ACE_Reactor_Mask mask,
ACE_Time_Value *timeout)
{
ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
// Just consider this method a "no-op" if there's no
// <ACE_Select_Reactor> configured.
if (this->select_reactor_ == 0)
return 0;
ACE_Event_Handler_var safe_handler (event_handler);
if (event_handler)
{
event_handler->add_reference ();
}
ACE_Notification_Buffer buffer (event_handler, mask);
#if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
int const notification_required =
notification_queue_.push_new_notification(buffer);
if (notification_required == -1)
{
return -1;
}
if (notification_required == 0)
{
// No failures, the handler is now owned by the notification queue
safe_handler.release ();
return 0;
}
#endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
(char *) &buffer,
sizeof buffer,
timeout);
if (n == -1)
{
return -1;
}
// No failures.
safe_handler.release ();
return 0;
}
4.5. 通知的注意事项¶
避免死锁
- 缺省机制下,反应器的通知机制是通过有界缓存来实现的,而 notify() 使用柱塞式发送调用将通知插入队列中。因此如果缓存区已经满,某个时间处理器的 handle_*() 函数又调用了 notify() ,就可能发生死锁。 避免方式可采用如下:
- 使用 notify() 函数时候指定超时时间
- 对应用进行设计,保证生成 notify() 调用的速度不会快于处理器处理的速度,这是最好的保护方式。
- 扩大 ACE_Select_Reactor 的通知机制
- 将通知机制的有界缓存替换成任意扩大的用户队列,通过 $ACE_ROOT/ace/config.h 文件中增加 #define ACE_HAS_REACTOR_NOTIFICATION_QUEUE 宏定义,然后重新编译,缺省情况下该特性没有被启用,因为高性能系统或者嵌入式系统中难以接受。
- 使用该用户空间可扩大队列,允许使用 ACE_Reactor::purge_pending_notifications() 方法扫描队列,并移除符合要求的时间处理器,避免已经销毁的时间处理器仍然挂在 notify 的队列中。