NEURON
bbssrv2mpi.cpp
Go to the documentation of this file.
1 #include <../../nrnconf.h>
2 #include <nrnmpi.h>
3 #if NRNMPI // to end of file
4 #include <stdio.h>
5 #include <string.h>
6 #include <assert.h>
7 #include "bbssrv2mpi.h"
8 #include "bbssrv.h"
9 #include "bbsimpl.h"
10 #include "hocdec.h" //Printf
11 
12 #include "utils/logger.hpp"
13 
14 void nrnbbs_context_wait();
15 
17 
18 #include <map>
19 #include <set>
20 
21 #define debug 0
22 
23 #define MessageList MpiMessageList
24 #define WorkItem MpiWorkItem
25 #define WorkList MpiWorkList
26 #define ReadyList MpiReadyList
27 #define ResultList MpiResultList
28 #define PendingList MpiPendingList
29 #define LookingToDoList MpiLookingToDoList
30 
31 class WorkItem {
32  public:
33  WorkItem(int id, bbsmpibuf* buf, int cid);
34  virtual ~WorkItem();
36  int id_;
37  bbsmpibuf* buf_;
38  int cid_; // mpi host id
39  bool todo_less_than(const WorkItem*) const;
40 };
41 
42 struct ltstr {
43  bool operator()(const char* s1, const char* s2) const {
44  return strcmp(s1, s2) < 0;
45  }
46 };
47 
48 struct ltWorkItem {
49  bool operator()(const WorkItem* w1, const WorkItem* w2) const {
50  return w1->todo_less_than(w2);
51  }
52 };
53 
54 static char* newstr(const char* s) {
55  char* s1 = new char[strlen(s) + 1];
56  strcpy(s1, s);
57  return s1;
58 }
59 
60 WorkItem::WorkItem(int id, bbsmpibuf* buf, int cid) {
61 #if debug == 2
62  printf("WorkItem %d\n", id);
63 #endif
64  id_ = id;
65  buf_ = buf;
66  cid_ = cid;
67  parent_ = nullptr;
68 }
69 
71 #if debug
72  printf("~WorkItem %d\n", id_);
73 #endif
74 }
75 
76 bool WorkItem::todo_less_than(const WorkItem* w) const {
77  WorkItem* w1 = (WorkItem*) this;
78  WorkItem* w2 = (WorkItem*) w;
79  while (w1->parent_ != w2->parent_) {
80  if (w1->id_ < w2->id_) {
81  w2 = w2->parent_;
82  } else {
83  w1 = w1->parent_;
84  }
85  }
86 #if debug
87  printf("todo_less_than %d < %d return %d\n", this->id_, w->id_, w1->id_ < w2->id_);
88 #endif
89  return w1->id_ < w2->id_;
90 }
91 
92 class MessageList: public std::multimap<const char*, bbsmpibuf*, ltstr> {};
93 class PendingList: public std::multimap<const char*, const int, ltstr> {};
94 class WorkList: public std::map<int, const WorkItem*> {};
95 class LookingToDoList: public std::set<int> {};
96 class ReadyList: public std::set<const WorkItem*, ltWorkItem> {};
97 class ResultList: public std::multimap<int, const WorkItem*> {};
98 
100  messages_ = new MessageList();
101  work_ = new WorkList();
102  todo_ = new ReadyList();
103  results_ = new ResultList();
104  pending_ = new PendingList();
105  looking_todo_ = new LookingToDoList();
106  send_context_ = new LookingToDoList();
107  next_id_ = FIRSTID;
108  context_buf_ = nullptr;
110 }
111 
113  delete todo_;
114  delete results_;
115  delete looking_todo_;
116  printf("~BBSLocalServer not deleting everything\n");
117  // need to unref MessageValue in messages_ and delete WorkItem in work_
118  delete pending_;
119  delete messages_;
120  delete work_;
121  delete send_context_;
122 }
123 
124 bool BBSDirectServer::look_take(const char* key, bbsmpibuf** recv) {
125 #if debug
126  printf("DirectServer::look_take |%s|\n", key);
127 #endif
128  bool b = false;
129  nrnmpi_unref(*recv);
130  *recv = nullptr;
131  MessageList::iterator m = messages_->find(key);
132  if (m != messages_->end()) {
133  b = true;
134  *recv = (*m).second;
135  // printf("free %d\n", buf);
136  char* s = (char*) ((*m).first);
137  messages_->erase(m);
138  delete[] s;
139  }
140 #if debug
141  printf("DirectServer::look_take |%s| recv=%p return %d\n", key, (*recv), b);
142 #endif
143  return b;
144 }
145 
146 bool BBSDirectServer::look(const char* key, bbsmpibuf** recv) {
147 #if debug
148  printf("DirectServer::look |%s|\n", key);
149 #endif
150  bool b = false;
151  nrnmpi_unref(*recv);
152  *recv = nullptr;
153  MessageList::iterator m = messages_->find(key);
154  if (m != messages_->end()) {
155  b = true;
156  *recv = (*m).second;
157  if (*recv) {
158  nrnmpi_ref(*recv);
159  }
160  }
161 #if debug
162  printf("DirectServer::look |%s| recv=%p return %d\n", key, (*recv), b);
163 #endif
164  return b;
165 }
166 
167 void BBSDirectServer::put_pending(const char* key, int cid) {
168 #if debug
169  printf("put_pending |%s| %d\n", key, cid);
170 #endif
171  char* s = newstr(key);
172  pending_->emplace(s, cid);
173 }
174 
175 bool BBSDirectServer::take_pending(const char* key, int* cid) {
176  bool b = false;
177  PendingList::iterator p = pending_->find(key);
178  if (p != pending_->end()) {
179  *cid = (*p).second;
180 #if debug
181  printf("take_pending |%s| %d\n", key, *cid);
182 #endif
183  char* s = (char*) ((*p).first);
184  pending_->erase(p);
185  delete[] s;
186  b = true;
187  }
188  return b;
189 }
190 
191 void BBSDirectServer::post(const char* key, bbsmpibuf* send) {
192  int cid;
193 #if debug
194  printf("DirectServer::post |%s| send=%p\n", key, send);
195 #endif
196  if (take_pending(key, &cid)) {
197  nrnmpi_bbssend(cid, TAKE, send);
198  } else {
199  messages_->emplace(newstr(key), send);
200  nrnmpi_ref(send);
201  }
202 }
203 
204 void BBSDirectServer::add_looking_todo(int cid) {
205  looking_todo_->emplace(cid);
206 }
207 
208 void BBSDirectServer::post_todo(int pid, int cid, bbsmpibuf* send) {
209 #if debug
210  printf("BBSDirectServer::post_todo pid=%d cid=%d send=%p\n", pid, cid, send);
211 #endif
212  WorkItem* w = new WorkItem(next_id_++, send, cid);
213  nrnmpi_ref(send);
214  WorkList::iterator p = work_->find(pid);
215  if (p != work_->end()) {
216  w->parent_ = (WorkItem*) ((*p).second);
217  }
218  work_->emplace(w->id_, w);
219 #if debug
220  printf("work insert %d\n", w->id_);
221 #endif
222  LookingToDoList::iterator i = looking_todo_->begin();
223  if (i != looking_todo_->end()) {
224  cid = (*i);
225  looking_todo_->erase(i);
226  // the send buffer is correct
227  nrnmpi_bbssend(cid, w->id_ + 1, send);
228  } else {
229 #if debug
230  printf("todo insert\n");
231 #endif
232  todo_->emplace(w);
233  }
234 }
235 
236 void BBSDirectServer::context(bbsmpibuf* send) {
237  int cid, j;
238 #if debug
239  printf("numprocs_bbs=%d\n", nrnmpi_numprocs_bbs);
240 #endif
241  // Previous context may complete after allowing more activity
242  for (j = 0; j < 1000; ++j) {
243  if (remaining_context_cnt_ == 0) {
244  break;
245  }
246  handle();
247  }
248  if (remaining_context_cnt_ > 0) {
249  Printf("some workers did not receive previous context\n");
250  send_context_->erase(send_context_->begin(), send_context_->end());
251  nrnmpi_unref(context_buf_);
252  context_buf_ = nullptr;
253  }
255  for (j = 1; j < nrnmpi_numprocs_bbs; ++j) {
256  send_context_->emplace(j);
257  }
258  LookingToDoList::iterator i = looking_todo_->begin();
259  while (i != looking_todo_->end()) {
260  cid = (*i);
261  looking_todo_->erase(i);
262 #if debug
263  printf("sending context to already waiting %d\n", cid);
264 #endif
265  nrnmpi_bbssend(cid, CONTEXT + 1, send);
266  i = send_context_->find(cid);
267  send_context_->erase(i);
269  i = looking_todo_->begin();
270  }
271  if (remaining_context_cnt_ > 0) {
272  context_buf_ = send;
273  nrnmpi_ref(context_buf_);
274  handle();
275  }
276 }
277 
278 void nrnbbs_context_wait() {
279  if (BBSImpl::is_master_) {
281  }
282 }
283 
285  // printf("context_wait enter %d\n", remaining_context_cnt_);
286  while (remaining_context_cnt_) {
287  handle();
288  }
289  // printf("context_wait exit %d\n", remaining_context_cnt_);
290 }
291 
292 bool BBSDirectServer::send_context(int cid) {
293  LookingToDoList::iterator i = send_context_->find(cid);
294  if (i != send_context_->end()) {
295  send_context_->erase(i);
296 #if debug
297  printf("sending context to %d\n", cid);
298 #endif
299  nrnmpi_bbssend(cid, CONTEXT + 1, context_buf_);
300  if (--remaining_context_cnt_ <= 0) {
301  nrnmpi_unref(context_buf_);
302  context_buf_ = nullptr;
303  }
304  return true;
305  }
306  return false;
307 }
308 
309 void BBSDirectServer::post_result(int id, bbsmpibuf* send) {
310 #if debug
311  printf("DirectServer::post_result id=%d send=%p\n", id, send);
312 #endif
313  WorkList::iterator i = work_->find(id);
314  WorkItem* w = (WorkItem*) ((*i).second);
315  nrnmpi_ref(send);
316  nrnmpi_unref(w->buf_);
317  w->buf_ = send;
318  results_->emplace(w->parent_ ? w->parent_->id_ : 0, w);
319 }
320 
321 int BBSDirectServer::look_take_todo(bbsmpibuf** recv) {
322 #if debug
323  printf("DirectServer::look_take_todo\n");
324 #endif
325  nrnmpi_unref(*recv);
326  *recv = nullptr;
327  ReadyList::iterator i = todo_->begin();
328  if (i != todo_->end()) {
329  WorkItem* w = (WorkItem*) (*i);
330  todo_->erase(i);
331  *recv = w->buf_;
332 #if debug
333  printf("DirectServer::look_take_todo recv %p with keypos=%d return %d\n",
334  *recv,
335  (*recv)->keypos,
336  w->id_);
337 #endif
338  w->buf_ = 0;
339  return w->id_;
340  } else {
341  return 0;
342  }
343 }
344 
345 int BBSDirectServer::look_take_result(int pid, bbsmpibuf** recv) {
346 #if debug
347  printf("DirectServer::look_take_result pid=%d\n", pid);
348 #endif
349  nrnmpi_unref(*recv);
350  *recv = nullptr;
351  ResultList::iterator i = results_->find(pid);
352  if (i != results_->end()) {
353  WorkItem* w = (WorkItem*) ((*i).second);
354  results_->erase(i);
355  *recv = w->buf_;
356  int id = w->id_;
357  WorkList::iterator j = work_->find(id);
358  work_->erase(j);
359  delete w;
360 #if debug
361  printf("DirectServer::look_take_result recv=%p return %d\n", *recv, id);
362 #endif
363  return id;
364  } else {
365  return 0;
366  }
367 }
368 
369 #endif // NRNMPI
static char * newstr(const char *s)
Definition: bbslsrv.cpp:42
#define TAKE
Definition: bbssrv.h:7
#define FIRSTID
Definition: bbssrv.h:22
#define CONTEXT
Definition: bbssrv.h:18
int look_take_result(int parentid)
void add_looking_todo(int cid)
MpiReadyList * todo_
Definition: bbssrv2mpi.h:49
bool look(const char *key)
int context_buf_
Definition: bbslsrv2.h:35
void context(int ncid, int *cids)
bool look_take(const char *key)
MpiLookingToDoList * looking_todo_
Definition: bbssrv2mpi.h:48
static void handle()
void post_todo(int parentid, int cid)
void post(const char *key)
void put_pending(const char *key, int cid)
MpiLookingToDoList * send_context_
Definition: bbssrv2mpi.h:51
virtual ~BBSDirectServer()
static BBSDirectServer * server_
Definition: bbslsrv2.h:17
MpiMessageList * messages_
Definition: bbssrv2mpi.h:45
MpiPendingList * pending_
Definition: bbssrv2mpi.h:46
bool send_context(int cid)
bool take_pending(const char *key, int *cid)
int remaining_context_cnt_
Definition: bbslsrv2.h:36
void post_result(int id)
MpiWorkList * work_
Definition: bbssrv2mpi.h:47
MpiResultList * results_
Definition: bbssrv2mpi.h:50
static bool is_master_
Definition: bbsimpl.h:60
bool todo_less_than(const WorkItem *) const
Definition: bbslsrv.cpp:66
int id_
Definition: bbslsrv.cpp:25
WorkItem * parent_
Definition: bbslsrv.cpp:24
WorkItem(int id, MessageValue *)
Definition: bbslsrv.cpp:49
virtual ~WorkItem()
Definition: bbslsrv.cpp:59
#define key
Definition: tqueue.hpp:45
#define id
Definition: md1redef.h:41
#define i
Definition: md1redef.h:19
char buf[512]
Definition: init.cpp:13
static double map(void *v)
Definition: mlinedit.cpp:43
printf
Definition: extdef.h:5
size_t p
size_t j
w2
Definition: multisend.cpp:498
w1
Definition: multisend.cpp:497
s
Definition: multisend.cpp:521
int nrnmpi_numprocs_bbs
bool operator()(const WorkItem *w1, const WorkItem *w2) const
Definition: bbslsrv.cpp:37
bool operator()(const char *s1, const char *s2) const
Definition: bbslsrv.cpp:31
int Printf(const char *fmt, Args... args)
Definition: logger.hpp:18