43 #include <condition_variable>
52 #define CACHELINE_ALLOC(name, type, size) \
53 name = (type*) nrn_cacheline_alloc((void**) &name, size * sizeof(type))
54 #define CACHELINE_CALLOC(name, type, size) \
55 name = (type*) nrn_cacheline_calloc((void**) &name, size, sizeof(type))
72 static std::vector<std::pair<int, NrnThreadMembList*>>
table_check_;
86 bool interpreter_locked{
false};
87 std::unique_ptr<std::mutex> interpreter_lock;
89 enum struct worker_flag { execute_job, exit, wait };
95 struct worker_conf_t {
97 std::variant<std::monostate,
99 std::pair<worker_job_with_token_t, neuron::model_sorted_token const*>>
101 std::size_t thread_id{};
102 worker_flag flag{worker_flag::wait};
103 friend bool operator==(worker_conf_t
const& lhs, worker_conf_t
const&
rhs) {
104 return lhs.flag ==
rhs.flag && lhs.thread_id ==
rhs.thread_id && lhs.job ==
rhs.job;
108 struct worker_kernel {
109 worker_kernel(std::size_t thread_id)
110 : m_thread_id{thread_id} {}
111 void operator()(std::monostate
const&)
const {
112 throw std::runtime_error(
"worker_kernel");
118 std::pair<worker_job_with_token_t, neuron::model_sorted_token const*>
const& pair)
const {
119 auto const& [job, token_ptr] = pair;
124 std::size_t m_thread_id{};
127 void worker_main(worker_conf_t* my_wc_ptr,
128 std::condition_variable* my_cond_ptr,
129 std::mutex* my_mut_ptr) {
133 auto& cond{*my_cond_ptr};
134 auto&
mut{*my_mut_ptr};
135 auto& wc{*my_wc_ptr};
140 while (wc.flag == worker_flag::wait) {
143 if (wc.flag == worker_flag::exit) {
146 assert(wc.flag == worker_flag::execute_job);
147 std::visit(worker_kernel{wc.thread_id}, wc.job);
148 wc.flag = worker_flag::wait;
149 wc.job = std::monostate{};
152 worker_conf_t conf{};
156 std::unique_lock<std::mutex>
lock{
mut};
157 cond.wait(
lock, [&wc] {
return wc.flag != worker_flag::wait; });
159 assert(wc.flag == worker_flag::execute_job || wc.flag == worker_flag::exit);
160 if (wc.flag == worker_flag::exit) {
163 assert(wc.flag == worker_flag::execute_job);
169 std::visit(worker_kernel{conf.thread_id}, conf.job);
173 std::lock_guard<std::mutex> _{
mut};
176 if (wc.flag == worker_flag::execute_job) {
178 wc.flag = worker_flag::wait;
179 wc.job = std::monostate{};
190 struct worker_threads_t {
192 : m_cond{std::make_unique<std::condition_variable[]>(
nrn_nthread)}
193 , m_mut{std::make_unique<std::mutex[]>(
nrn_nthread)} {
198 m_worker_threads.emplace_back();
200 new (m_wc +
i) worker_conf_t{};
201 m_wc[
i].thread_id =
i;
202 m_worker_threads.emplace_back(worker_main, &(m_wc[
i]), &(m_cond[
i]), &(m_mut[
i]));
204 if (!interpreter_lock) {
205 interpreter_locked =
false;
206 interpreter_lock = std::make_unique<std::mutex>();
213 ~worker_threads_t() {
218 std::lock_guard<std::mutex> _{m_mut[
i]};
219 m_wc[
i].flag = worker_flag::exit;
221 m_cond[
i].notify_one();
222 m_worker_threads[
i].join();
224 if (interpreter_lock) {
225 interpreter_lock.reset();
226 interpreter_locked = 0;
229 free(std::exchange(m_wc,
nullptr));
234 auto& cond = m_cond[
worker];
237 std::unique_lock<std::mutex>
lock{m_mut[
worker]};
239 cond.wait(
lock, [&wc] {
return wc.flag == worker_flag::wait; });
240 assert(std::holds_alternative<std::monostate>(wc.job));
243 wc.flag = worker_flag::execute_job;
249 void assign_job(std::size_t
worker,
253 auto& cond = m_cond[
worker];
256 std::unique_lock<std::mutex>
lock{m_mut[
worker]};
258 cond.wait(
lock, [&wc] {
return wc.flag == worker_flag::wait; });
259 assert(std::holds_alternative<std::monostate>(wc.job));
261 wc.job = std::make_pair(job, &cache_token);
262 wc.flag = worker_flag::execute_job;
273 while (wc.flag != worker_flag::wait) {
277 std::unique_lock<std::mutex>
lock{m_mut[
i]};
278 m_cond[
i].wait(
lock, [&wc] {
return wc.flag == worker_flag::wait; });
282 std::size_t num_workers()
const {
283 return m_worker_threads.size();
288 std::unique_ptr<std::condition_variable[]> m_cond;
290 std::unique_ptr<std::mutex[]> m_mut;
291 std::vector<std::thread> m_worker_threads;
292 worker_conf_t* m_wc{};
294 std::unique_ptr<worker_threads_t> worker_threads{};
307 worker_threads.reset();
327 for (
i = 0;
i <
n; ++
i) {
358 #if NRN_ENABLE_THREADS
360 if (parallel !=
static_cast<bool>(worker_threads)) {
361 worker_threads.reset();
365 printf(
"This MPI is not threadsafe so threads are disabled.\n");
371 worker_threads = std::make_unique<worker_threads_t>();
391 for (tml = nt->
tml; tml; tml = tml2) {
394 free((
char*) std::exchange(ml->
nodelist,
nullptr));
395 free((
char*) std::exchange(ml->
nodeindices,
nullptr));
396 delete[] std::exchange(ml->
prop,
nullptr);
398 free((
char*) std::exchange(ml->
pdata,
nullptr));
404 delete[] std::exchange(ml->
_thread,
nullptr);
406 delete std::exchange(ml,
nullptr);
407 free((
char*) std::exchange(tml,
nullptr));
410 free((
char*) std::exchange(nt->
_ml_list,
nullptr));
414 for (tbl = nt->
tbl[
i]; tbl; tbl = tbl2) {
465 printf(
"thread_memblist_setup %lx v_node_count=%d ncell=%d end=%d\n", (
long)nth, v_node_count, nth->ncell, nth->end);
472 for (
i = 0;
i < _nt->
end; ++
i) {
474 for (
p = nd->
prop;
p;
p =
p->next) {
517 for (tml = _nt->
tml; tml; tml = tml->
next) {
522 for (
i = 0;
i < _nt->
end; ++
i) {
524 for (
p = nd->
prop;
p;
p =
p->next) {
545 for (
i = 0;
i < _nt->
end; ++
i) {
558 for (
i = 0;
i < _nt->
end; ++
i) {
571 for (tml = _nt->
tml; tml; tml = tml->
next) {
587 if (!bamap[bam->
type]) {
588 bamap[bam->
type] = bam;
593 for (tml = _nt->
tml; tml; tml = tml->
next) {
594 if (bamap[tml->
index]) {
595 int mtype = tml->
index;
597 for (bam = bamap[mtype]; bam && bam->
type == mtype; bam = bam->
next) {
610 for (tml = _nt->
tml; tml; tml = tml->
next) {
648 (*nrn_mk_transfer_thread_data_)();
686 std::queue<Section*> que;
688 while (!que.empty()) {
712 nd =
sec->parentnode;
715 _nt->_v_parent[inode] =
nullptr;
721 std::queue<Section*> que;
723 while (!que.empty()) {
735 for (
int j = 0;
j <
sec->nnode; ++
j) {
740 _nt->_v_parent[inode] =
sec->pnode[
j - 1];
742 _nt->_v_parent[inode] =
sec->parentnode;
744 _nt->_v_node[inode]->v_node_index = inode;
749 assert(_nt->end == inode);
765 for (inode = 0; inode < _nt->end; ++inode) {
766 _nt->_v_node[inode]->_classical_parent = _nt->_v_parent[inode];
771 (*nrn_multisplit_setup_)();
787 int index = tml->index;
799 int index = tml->index;
812 memb_func[tml->index].thread_table_check_(
820 #if NRN_ENABLE_THREADS
822 interpreter_lock->lock();
823 interpreter_locked =
true;
828 #if NRN_ENABLE_THREADS
829 if (interpreter_locked) {
830 interpreter_locked =
false;
831 interpreter_lock->unlock();
837 #if NRN_ENABLE_THREADS
838 if (worker_threads) {
841 worker_threads->assign_job(
i, job);
844 worker_threads->wait();
857 #if NRN_ENABLE_THREADS
858 if (worker_threads) {
861 worker_threads->assign_job(
i, cache_token, job);
864 worker_threads->wait();
877 #if NRN_ENABLE_THREADS
878 if (worker_threads) {
880 worker_threads->assign_job(
i, job);
881 worker_threads->wait();
892 #if NRN_ENABLE_THREADS
893 if (worker_threads) {
894 worker_threads->wait();
950 hoc_execerror(
"some threads have a user defined partition",
"and some do not");
975 sec->volatile_mark = 0;
987 if (
sec->parentsec) {
988 Sprintf(
buf,
"in thread partition %d is not a root section", it);
991 if (
sec->volatile_mark) {
992 Sprintf(
buf,
"appeared again in partition %d", it);
995 sec->volatile_mark = 1;
1000 "The total number of cells, %d, is different than the number of user partition "
1010 #if NRN_ENABLE_THREADS
1018 worker_threads->wait();
1039 #if NRN_ENABLE_THREADS
1042 return std::thread::hardware_concurrency();
1049 return worker_threads.get() ? worker_threads->num_workers() : 0;
1081 if (node_data.field_active<Tag>()) {
1091 if (node_data.field_active<Tag>()) {
const char * secname(Section *sec)
name of section (for use in error messages)
static double order(void *v)
Object ** hoc_temp_objvar(Symbol *symtemp, void *v)
void hoc_obj_ref(Object *obj)
Symbol * hoc_lookup(const char *)
void hoc_obj_unref(Object *obj)
hoc_List * hoc_l_newlist()
hoc_Item * hoc_l_lappendsec(hoc_List *, struct Section *)
void hoc_l_freelist(hoc_List **)
#define BEFORE_AFTER_SIZE
#define ITERATE(itm, lst)
void(*)(neuron::model_sorted_token const &, NrnThread &) worker_job_with_token_t
void *(*)(NrnThread *) worker_job_t
auto for_threads(NrnThread *threads, int num_threads)
void nrn_multisplit_ptr_update()
static int table_check_cnt_
--> CoreNeuron class
void hoc_execerror(const char *s1, const char *s2)
static void * emalloc(size_t size)
constexpr do_not_search_t do_not_search
Model & model()
Access the global Model instance.
int Sprintf(char(&buf)[N], const char *fmt, Args &&... args)
Redirect sprintf to snprintf if the buffer size can be deduced.
bool operator==(unified_allocator< T > const &, unified_allocator< U > const &) noexcept
std::unique_ptr< std::mutex > nmodlmutex
void section_ref(Section *)
int const size_t const size_t n
std::vector< Memb_func > memb_func
static int allow_busywait_
void nrn_thread_memblist_setup()
void(* nrn_multisplit_setup_)()
void nrn_onethread_job(int i, void *(*job)(NrnThread *))
int nrn_allow_busywait(int b)
static void thread_memblist_setup(NrnThread *_nt, int *mlcnt, void **vmap)
#define CACHELINE_ALLOC(name, type, size)
void nrn_use_busywait(int b)
void nrn_thread_error(const char *s)
static int busywait_main_
void nrn_multithread_job(worker_job_t job)
Object ** nrn_get_thread_partition(int it)
int nrn_how_many_processors()
static std::vector< std::pair< int, NrnThreadMembList * > > table_check_
void nrn_thread_partition(int it, Object *sl)
#define CACHELINE_CALLOC(name, type, size)
void nrn_mk_table_check()
static void * nulljob(NrnThread *nt)
void nrn_fast_imem_alloc()
void nrn_threads_create(int n, bool parallel)
void nrn_wait_for_threads()
void nrn_thread_table_check(neuron::model_sorted_token const &sorted_token)
std::size_t nof_worker_threads()
void(* nrn_mk_transfer_thread_data_)()
static double worker(void *v)
A view into a set of mechanism instances.
struct NrnThreadBAList * next
Represent main neuron object computed by single thread.
double * node_sav_d_storage()
double * node_a_storage()
std::size_t _node_data_offset
Offset in the global node data where this NrnThread's values start.
NrnThreadBAList * tbl[BEFORE_AFTER_SIZE]
double * node_sav_rhs_storage()
double * node_rhs_storage()
double * node_area_storage()
double * node_d_storage()
Memb_list * _ecell_memb_list
double * node_voltage_storage()
double * node_b_storage()
struct NrnThreadMembList * next
A point process is computed just like regular mechanisms.
void apply_to_mechanisms(Callable const &callable)
Apply a function to each non-null Mechanism.
container::Node::storage & node_data()
Access the structure containing the data of all Nodes.
Above-diagonal element in node equation.
Area in um^2 but see treeset.cpp.
Below-diagonal element in node equation.
Diagonal element in node equation.
Field for fast_imem calculation.
Field for fast_imem calculation.
Non-template stable handle to a generic value.
T get() const
Explicit conversion to any T.
Tag::type & get(std::size_t offset)
Get the offset-th element of the column named by Tag.
void mark_as_unsorted()
Tell the container it is no longer sorted.
void set_field_status(bool enabled)
Enable/disable the fields associated with the given tags.