// Copyright (C) 2014 Ian Forbed // Copyright (C) 2014 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP #include #include #include #include #include #include #include namespace boost { namespace concurrent { namespace detail { template struct scheduled_type { typedef T value_type; typedef Clock clock; typedef typename clock::time_point time_point; T data; time_point time; BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type) scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {} scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {} scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {} scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) { data = other.data; time = other.time; return *this; } scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {} scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) { data = boost::move(other.data); time = other.time; return *this; } bool time_not_reached() const { return time > clock::now(); } bool operator <(const scheduled_type other) const { return this->time > other.time; } }; //end struct } //end detail namespace template class sync_timed_queue : private sync_priority_queue > { typedef detail::scheduled_type stype; typedef sync_priority_queue super; public: typedef T value_type; typedef Clock clock; typedef typename clock::duration duration; typedef typename clock::time_point time_point; typedef typename super::underlying_queue_type underlying_queue_type; typedef typename super::size_type size_type; typedef typename super::op_status op_status; sync_timed_queue() : super() {}; ~sync_timed_queue() {} using super::size; using super::empty; using super::full; using super::close; using super::closed; T pull(); void pull(T& elem); template queue_op_status pull_until(chrono::time_point const& tp, T& elem); template queue_op_status pull_for(chrono::duration const& dura, T& elem); queue_op_status try_pull(T& elem); queue_op_status wait_pull(T& elem); queue_op_status nonblocking_pull(T& elem); template void push(const T& elem, chrono::time_point const& tp); template void push(const T& elem, chrono::duration const& dura); template void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point const& tp); template void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura); template queue_op_status try_push(const T& elem, chrono::time_point const& tp); template queue_op_status try_push(const T& elem, chrono::duration const& dura); template queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point const& tp); template queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura); private: T pull(unique_lock&); T pull(lock_guard&); void pull(unique_lock&, T& elem); void pull(lock_guard&, T& elem); queue_op_status try_pull(unique_lock&, T& elem); queue_op_status try_pull(lock_guard&, T& elem); queue_op_status wait_pull(unique_lock& lk, T& elem); bool wait_until_not_empty_time_reached_or_closed(unique_lock&); T pull_when_time_reached(unique_lock&); template queue_op_status pull_when_time_reached_until(unique_lock&, chrono::time_point const& tp, T& elem); bool time_not_reached(unique_lock&); bool time_not_reached(lock_guard&); bool empty_or_time_not_reached(unique_lock&); bool empty_or_time_not_reached(lock_guard&); sync_timed_queue(const sync_timed_queue&); sync_timed_queue& operator=(const sync_timed_queue&); sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue)); sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue)); }; //end class template template void sync_timed_queue::push(const T& elem, chrono::time_point const& tp) { super::push(stype(elem,tp)); } template template void sync_timed_queue::push(const T& elem, chrono::duration const& dura) { push(elem, clock::now() + dura); } template template void sync_timed_queue::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point const& tp) { super::push(stype(boost::move(elem),tp)); } template template void sync_timed_queue::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura) { push(boost::move(elem), clock::now() + dura); } template template queue_op_status sync_timed_queue::try_push(const T& elem, chrono::time_point const& tp) { return super::try_push(stype(elem,tp)); } template template queue_op_status sync_timed_queue::try_push(const T& elem, chrono::duration const& dura) { return try_push(elem,clock::now() + dura); } template template queue_op_status sync_timed_queue::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point const& tp) { return super::try_push(stype(boost::move(elem), tp)); } template template queue_op_status sync_timed_queue::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura) { return try_push(boost::move(elem), clock::now() + dura); } /////////////////////////// template bool sync_timed_queue::time_not_reached(unique_lock&) { return super::data_.top().time_not_reached(); } template bool sync_timed_queue::time_not_reached(lock_guard&) { return super::data_.top().time_not_reached(); } /////////////////////////// template bool sync_timed_queue::wait_until_not_empty_time_reached_or_closed(unique_lock& lk) { for (;;) { if (super::closed(lk)) return true; while (! super::empty(lk)) { if (! time_not_reached(lk)) return false; super::not_empty_.wait_until(lk, super::data_.top().time); if (super::closed(lk)) return true; } if (super::closed(lk)) return true; super::not_empty_.wait(lk); } return false; } /////////////////////////// template T sync_timed_queue::pull_when_time_reached(unique_lock& lk) { while (time_not_reached(lk)) { super::throw_if_closed(lk); super::not_empty_.wait_until(lk,super::data_.top().time); super::wait_until_not_empty(lk); } return pull(lk); } template template queue_op_status sync_timed_queue::pull_when_time_reached_until(unique_lock& lk, chrono::time_point const& tp, T& elem) { chrono::time_point tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time; while (time_not_reached(lk)) { super::throw_if_closed(lk); if (queue_op_status::timeout == super::not_empty_.wait_until(lk, tpmin)) { if (time_not_reached(lk)) return queue_op_status::not_ready; return queue_op_status::timeout; } } pull(lk, elem); return queue_op_status::success; } /////////////////////////// template bool sync_timed_queue::empty_or_time_not_reached(unique_lock& lk) { if ( super::empty(lk) ) return true; if ( time_not_reached(lk) ) return true; return false; } template bool sync_timed_queue::empty_or_time_not_reached(lock_guard& lk) { if ( super::empty(lk) ) return true; if ( time_not_reached(lk) ) return true; return false; } /////////////////////////// template T sync_timed_queue::pull(unique_lock&) { #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES return boost::move(super::data_.pull().data); #else return super::data_.pull().data; #endif } template T sync_timed_queue::pull(lock_guard&) { #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES return boost::move(super::data_.pull().data); #else return super::data_.pull().data; #endif } template T sync_timed_queue::pull() { unique_lock lk(super::mtx_); super::wait_until_not_empty(lk); return pull_when_time_reached(lk); } /////////////////////////// template void sync_timed_queue::pull(unique_lock&, T& elem) { #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES elem = boost::move(super::data_.pull().data); #else elem = super::data_.pull().data; #endif } template void sync_timed_queue::pull(lock_guard&, T& elem) { #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES elem = boost::move(super::data_.pull().data); #else elem = super::data_.pull().data; #endif } template void sync_timed_queue::pull(T& elem) { unique_lock lk(super::mtx_); super::wait_until_not_empty(lk); elem = pull_when_time_reached(lk); } ////////////////////// template template queue_op_status sync_timed_queue::pull_until(chrono::time_point const& tp, T& elem) { unique_lock lk(super::mtx_); if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp)) return queue_op_status::timeout; return pull_when_time_reached_until(lk, tp, elem); } ////////////////////// template template queue_op_status sync_timed_queue::pull_for(chrono::duration const& dura, T& elem) { return pull_until(clock::now() + dura, elem); } /////////////////////////// template queue_op_status sync_timed_queue::try_pull(unique_lock& lk, T& elem) { if ( super::empty(lk) ) { if (super::closed(lk)) return queue_op_status::closed; return queue_op_status::empty; } if ( time_not_reached(lk) ) { if (super::closed(lk)) return queue_op_status::closed; return queue_op_status::not_ready; } pull(lk, elem); return queue_op_status::success; } template queue_op_status sync_timed_queue::try_pull(lock_guard& lk, T& elem) { if ( super::empty(lk) ) { if (super::closed(lk)) return queue_op_status::closed; return queue_op_status::empty; } if ( time_not_reached(lk) ) { if (super::closed(lk)) return queue_op_status::closed; return queue_op_status::not_ready; } pull(lk, elem); return queue_op_status::success; } template queue_op_status sync_timed_queue::try_pull(T& elem) { lock_guard lk(super::mtx_); return try_pull(lk, elem); } /////////////////////////// template queue_op_status sync_timed_queue::wait_pull(unique_lock& lk, T& elem) { if (super::empty(lk)) { if (super::closed(lk)) return queue_op_status::closed; } bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk); if (has_been_closed) return queue_op_status::closed; pull(lk, elem); return queue_op_status::success; } template queue_op_status sync_timed_queue::wait_pull(T& elem) { unique_lock lk(super::mtx_); return wait_pull(lk, elem); } // /////////////////////////// // template // queue_op_status sync_timed_queue::wait_pull(unique_lock &lk, T& elem) // { // if (super::empty(lk)) // { // if (super::closed(lk)) return queue_op_status::closed; // } // bool has_been_closed = super::wait_until_not_empty_or_closed(lk); // if (has_been_closed) return queue_op_status::closed; // pull(lk, elem); // return queue_op_status::success; // } // template // queue_op_status sync_timed_queue::wait_pull(T& elem) // { // unique_lock lk(super::mtx_); // return wait_pull(lk, elem); // } /////////////////////////// template queue_op_status sync_timed_queue::nonblocking_pull(T& elem) { unique_lock lk(super::mtx_, try_to_lock); if (! lk.owns_lock()) return queue_op_status::busy; return try_pull(lk, elem); } } //end concurrent namespace using concurrent::sync_timed_queue; } //end boost namespace #include #endif