NEURON
multisend.cpp
Go to the documentation of this file.
1 /*
2 # =============================================================================
3 # Copyright (c) 2016 - 2021 Blue Brain Project/EPFL
4 #
5 # See top-level LICENSE file for details.
6 # =============================================================================
7 */
8 
14 
15 /*
16 Overall exchange strategy
17 
18 When a cell spikes, it immediately does a multisend of
19 (int gid, double spiketime) to all the target machines that have
20 cells that need to receive this spike by spiketime + delay.
21 The MPI implementation does not block due to use of MPI_Isend.
22 
23 In order to minimize the number of nrnmpi_multisend_conserve tests
24 (and potentially abandon them altogether if I can ever guarantee
25 that exchange time is less than half the computation time), I divide the
26 minimum delay integration intervals into two equal subintervals.
27 So if a spike is generated in an even subinterval, I do not have
28 to include it in the conservation check until the end of the next even
29 subinterval.
30 
31 When a spike is received (generally MPI_Iprobe, MPI_Recv) it is placed in
32 even or odd buffers (depending on whether the coded gid is positive or negative)
33 
34 At the end of a computation subinterval the even or odd buffer spikes
35 are enqueued in the priority queue after checking that the number
36 of spikes sent is equal to the number of spikes sent.
37 */
38 
39 // The initial idea behind the optional phase2 is to avoid the large overhead of
40 // initiating a send of the up to 10k list of target hosts when a cell fires.
41 // I.e. when there are a small number of cells on a processor, this causes
42 // load balance problems.
43 // Load balance should be better if the send is distributed to a much smaller
44 // set of targets, which, when they receive the spike, pass it on to a neighbor
45 // set. A non-exclusive alternative to this is the use of RECORD_REPLAY
46 // which give a very fast initiation but we have not been able to get that
47 // to complete in the sense of all the targets receiving their spikes before
48 // the conservation step.
49 // We expect that phase2 will work best in combination with ENQUEUE=2
50 // which has the greatest amount of overlap between computation
51 // and communication.
52 namespace coreneuron {
56 
57 #if NRN_MULTISEND
58 
59 // ENQUEUE 0 means to Multisend_ReceiveBuffer buffer -> InputPreSyn.send
60 // ENQUEUE 1 means to Multisend_ReceiveBuffer buffer -> psbuf -> InputPreSyn.send
61 // ENQUEUE 2 means to Multisend_ReceiveBuffer.incoming -> InputPrySyn.send
62 // Note that ENQUEUE 2 give more overlap between computation and exchange
63 // since the enqueuing takes place during computation except for those
64 // remaining during conservation.
65 #define ENQUEUE 2
66 
67 #if ENQUEUE == 2
68 static unsigned long enq2_find_time_;
69 static unsigned long enq2_enqueue_time_; // includes enq_find_time_
70 #endif
71 
72 #define PHASE2BUFFER_SIZE 2048 // power of 2
73 #define PHASE2BUFFER_MASK (PHASE2BUFFER_SIZE - 1)
74 struct Phase2Buffer {
75  InputPreSyn* ps;
76  double spiketime;
77  int gid;
78 };
79 
80 #define MULTISEND_RECEIVEBUFFER_SIZE 10000
82  public:
84  virtual ~Multisend_ReceiveBuffer();
85  void init(int index);
86  void incoming(int gid, double spiketime);
87  void enqueue();
88  int index_{};
90  int count_{};
91  int maxcount_{};
92  bool busy_{};
93  int nsend_{}, nrecv_{}; // for checking conservation
94  int nsend_cell_{}; // cells that spiked this interval.
95  NRNMPI_Spike** buffer_{};
96 
97  void enqueue1();
98  void enqueue2();
99  InputPreSyn** psbuf_{};
100 
101  void phase2send();
102  int phase2_head_{};
103  int phase2_tail_{};
105  Phase2Buffer* phase2_buffer_{};
106 };
107 
108 #define MULTISEND_INTERVAL 2
109 static Multisend_ReceiveBuffer* multisend_receive_buffer[MULTISEND_INTERVAL];
110 static int current_rbuf, next_rbuf;
111 #if MULTISEND_INTERVAL == 2
112 // note that if a spike is supposed to be received by multisend_receive_buffer[1]
113 // then during transmission its gid is complemented.
114 #endif
115 
116 static int* targets_phase1_;
117 static int* targets_phase2_;
118 
119 void nrn_multisend_send(PreSyn* ps, double t, NrnThread* nt) {
120  int i = ps->multisend_index_;
121  if (i >= 0) {
122  // format is cnt, cnt_phase1, array of target ranks.
123  // Valid for one or two phase.
124  int* ranks = targets_phase1_ + i;
125  int cnt = ranks[0];
126  int cnt_phase1 = ranks[1];
127  ranks += 2;
128  NRNMPI_Spike spk;
129  spk.gid = ps->output_index_;
130  spk.spiketime = t;
131  if (next_rbuf == 1) {
132  spk.gid = ~spk.gid;
133  }
134  if (nt == nrn_threads) {
137  nrnmpi_multisend(&spk, cnt_phase1, ranks);
138  } else {
139  assert(0);
140  }
141  }
142 }
143 
144 static void multisend_send_phase2(InputPreSyn* ps, int gid, double t) {
145  int i = ps->multisend_phase2_index_;
146  assert(i >= 0);
147  // format is cnt_phase2, array of target ranks
148  int* ranks = targets_phase2_ + i;
149  int cnt_phase2 = ranks[0];
150  ranks += 1;
151  NRNMPI_Spike spk;
152  spk.gid = gid;
153  spk.spiketime = t;
154  nrnmpi_multisend(&spk, cnt_phase2, ranks);
155 }
156 
158  : buffer_ {
159  new NRNMPI_Spike*[size_]
160 }
161 #if ENQUEUE == 1
162 , psbuf_ {
163  new InputPreSyn*[size_]
164 }
165 #endif
166 , phase2_buffer_{new Phase2Buffer[PHASE2BUFFER_SIZE]} {}
167 
169  nrn_assert(!busy_);
170  for (int i = 0; i < count_; ++i) {
171  delete buffer_[i];
172  }
173  delete[] buffer_;
174  if (psbuf_)
175  delete[] psbuf_;
176  delete[] phase2_buffer_;
177 }
179  index_ = index;
181  busy_ = false;
182  for (int i = 0; i < count_; ++i) {
183  delete buffer_[i];
184  }
185  count_ = 0;
186 
189 }
190 void Multisend_ReceiveBuffer::incoming(int gid, double spiketime) {
191  // printf("%d %p.incoming %g %g %d\n", nrnmpi_myid, this, t, spk->spiketime, spk->gid);
192  nrn_assert(!busy_);
193  busy_ = true;
194 
195  if (count_ >= size_) {
196  size_ *= 2;
197  NRNMPI_Spike** newbuf = new NRNMPI_Spike*[size_];
198  for (int i = 0; i < count_; ++i) {
199  newbuf[i] = buffer_[i];
200  }
201  delete[] buffer_;
202  buffer_ = newbuf;
203  if (psbuf_) {
204  delete[] psbuf_;
205  psbuf_ = new InputPreSyn*[size_];
206  }
207  }
208  NRNMPI_Spike* spk = new NRNMPI_Spike();
209  spk->gid = gid;
210  spk->spiketime = spiketime;
211  buffer_[count_++] = spk;
212  if (maxcount_ < count_) {
213  maxcount_ = count_;
214  }
215 
216  ++nrecv_;
217  busy_ = false;
218 }
220  // printf("%d %p.enqueue count=%d t=%g nrecv=%d nsend=%d\n", nrnmpi_myid, this, t, count_,
221  // nrecv_, nsend_);
222  nrn_assert(!busy_);
223  busy_ = true;
224 
225  for (int i = 0; i < count_; ++i) {
226  NRNMPI_Spike* spk = buffer_[i];
227 
228  auto gid2in_it = gid2in.find(spk->gid);
229  assert(gid2in_it != gid2in.end());
230  InputPreSyn* ps = gid2in_it->second;
231 
232  if (use_phase2_ && ps->multisend_phase2_index_ >= 0) {
236  pb.ps = ps;
237  pb.spiketime = spk->spiketime;
238  pb.gid = spk->gid;
239  }
240 
241  ps->send(spk->spiketime, net_cvode_instance, nrn_threads);
242  delete spk;
243  }
244 
245  count_ = 0;
246 #if ENQUEUE != 2
247  nrecv_ = 0;
248  nsend_ = 0;
249  nsend_cell_ = 0;
250 #endif
251  busy_ = false;
252  phase2send();
253 }
254 
255 void Multisend_ReceiveBuffer::enqueue1() {
256  // printf("%d %lx.enqueue count=%d t=%g nrecv=%d nsend=%d\n", nrnmpi_myid, (long)this, t,
257  // count_, nrecv_, nsend_);
258  nrn_assert(!busy_);
259  busy_ = true;
260  for (int i = 0; i < count_; ++i) {
261  NRNMPI_Spike* spk = buffer_[i];
262 
263  auto gid2in_it = gid2in.find(spk->gid);
264  assert(gid2in_it != gid2in.end());
265  InputPreSyn* ps = gid2in_it->second;
266  psbuf_[i] = ps;
267  if (use_phase2_ && ps->multisend_phase2_index_ >= 0) {
271  pb.ps = ps;
272  pb.spiketime = spk->spiketime;
273  pb.gid = spk->gid;
274  }
275  }
276  busy_ = false;
277  phase2send();
278 }
279 
280 void Multisend_ReceiveBuffer::enqueue2() {
281  // printf("%d %lx.enqueue count=%d t=%g nrecv=%d nsend=%d\n", nrnmpi_myid, (long)this, t,
282  // count_, nrecv_, nsend_);
283  nrn_assert(!busy_);
284  busy_ = false;
285  for (int i = 0; i < count_; ++i) {
286  NRNMPI_Spike* spk = buffer_[i];
287  InputPreSyn* ps = psbuf_[i];
289  delete spk;
290  }
291  count_ = 0;
292  nrecv_ = 0;
293  nsend_ = 0;
294  nsend_cell_ = 0;
295  busy_ = false;
296 }
297 
299  while (phase2_head_ != phase2_tail_) {
302  int gid = pb.gid;
303  if (index_) {
304  gid = ~gid;
305  }
306  multisend_send_phase2(pb.ps, gid, pb.spiketime);
307  }
308 }
309 
310 static int max_ntarget_host;
311 // For one phase sending, max_multisend_targets is max_ntarget_host.
312 // For two phase sending, it is the maximum of all the
313 // ntarget_hosts_phase1 and ntarget_hosts_phase2.
314 static int max_multisend_targets;
315 
316 void nrn_multisend_init() {
317  for (int i = 0; i < n_multisend_interval; ++i) {
319  }
320  current_rbuf = 0;
322 #if ENQUEUE == 2
324 #endif
325 }
326 
327 static int multisend_advance() {
328  NRNMPI_Spike spk;
329  int i = 0;
330  while (nrnmpi_multisend_single_advance(&spk)) {
331  i += 1;
332  int j = 0;
333 #if MULTISEND_INTERVAL == 2
334  if (spk.gid < 0) {
335  spk.gid = ~spk.gid;
336  j = 1;
337  }
338 #endif
340  }
341  return i;
342 }
343 
344 #if NRN_MULTISEND
345 void nrn_multisend_advance() {
346  if (use_multisend_) {
348 #if ENQUEUE == 2
350 #endif
351  }
352 }
353 #endif
354 
356  // nrn_spike_exchange();
357  assert(nt == nrn_threads);
358  // double w1, w2;
359  int ncons = 0;
362 // w1 = nrn_wtime();
363 #if NRN_MULTISEND & 1
364  if (use_multisend_) {
366  nrnmpi_barrier();
368  // with two phase we expect conservation to hold and ncons should
369  // be 0.
370  while (nrnmpi_multisend_conserve(s, r) != 0) {
372  ++ncons;
373  }
374  }
375 #endif
376  // w1 = nrn_wtime() - w1;
377  // w2 = nrn_wtime();
378 
379 #if ENQUEUE == 0
381 #endif
382 #if ENQUEUE == 1
385 #endif
386 #if ENQUEUE == 2
389 
392 
393  enq2_find_time_ = 0;
394  enq2_enqueue_time_ = 0;
395 #endif // ENQUEUE == 2
396 // wt1_ = nrn_wtime() - w2;
397 // wt_ = w1;
398 #if MULTISEND_INTERVAL == 2
399  // printf("%d reverse buffers %g\n", nrnmpi_myid, t);
400  if (n_multisend_interval == 2) {
402  next_rbuf = ((next_rbuf + 1) & 1);
403  }
404 #endif
405 }
406 
407 void nrn_multisend_cleanup() {
408  if (targets_phase1_) {
409  delete[] targets_phase1_;
410  targets_phase1_ = nullptr;
411  }
412 
413  if (targets_phase2_) {
414  delete[] targets_phase2_;
415  targets_phase2_ = nullptr;
416  }
417 
418  // cleanup MultisendReceiveBuffer here as well
419 }
420 
421 void nrn_multisend_setup() {
423  if (!use_multisend_) {
424  return;
425  }
427  // if (nrnmpi_myid == 0) printf("multisend_setup()\n");
428  // although we only care about the set of hosts that gid2out_
429  // sends spikes to (source centric). We do not want to send
430  // the entire list of gid2in (which may be 10000 times larger
431  // than gid2out) from every machine to every machine.
432  // so we accomplish the task in two phases the first of which
433  // involves allgather with a total receive buffer size of number
434  // of cells (even that is too large and we will split it up
435  // into chunks). And the second, an
436  // allreduce with receive buffer size of number of hosts.
437  max_ntarget_host = 0;
439 
440  // completely new algorithm does one and two phase.
441  nrn_multisend_setup_targets(use_phase2_, targets_phase1_, targets_phase2_);
442 
443  if (!multisend_receive_buffer[0]) {
445  }
446 #if MULTISEND_INTERVAL == 2
449  }
450 #endif
451 }
452 #endif // NRN_MULTISEND
453 } // namespace coreneuron
static void nrnmpi_barrier()
void init(int index)
Definition: multisend.cpp:185
void incoming(int gid, double spiketime)
Definition: multisend.cpp:196
virtual ~Multisend_ReceiveBuffer()
Definition: multisend.cpp:174
NRNMPI_Spike ** buffer_
Definition: multisend.cpp:117
Phase2Buffer * phase2_buffer_
Definition: multisend.cpp:129
Definition: netcon.h:258
int output_index_
Definition: netcon.h:308
virtual void send(double sendtime, NetCvode *, NrnThread *)
Definition: netcvode.cpp:3016
#define cnt
Definition: tqueue.hpp:44
#define i
Definition: md1redef.h:19
#define assert(ex)
Definition: hocassrt.h:24
static int ncons
Definition: kinetic.cpp:456
void init()
Definition: init.cpp:141
THIS FILE IS AUTO GENERATED DONT MODIFY IT.
NrnThread * nrn_threads
Definition: multicore.cpp:56
bool use_phase2_
Definition: multisend.cpp:54
void nrn_multisend_send(PreSyn *, double t, NrnThread *)
std::map< int, InputPreSyn * > gid2in
Definition: nrn_setup.cpp:160
bool use_multisend_
Definition: multisend.cpp:53
int n_multisend_interval
Definition: multisend.cpp:55
void nrn_multisend_receive(NrnThread *)
void nrn_multisend_setup_targets(bool use_phase2, int *&targets_phase1, int *&targets_phase2)
void nrn_multisend_advance()
NetCvode * net_cvode_instance
Definition: netcvode.cpp:35
#define nrn_assert(x)
assert()-like macro, independent of NDEBUG status
Definition: nrn_assert.h:33
size_t j
#define MULTISEND_RECEIVEBUFFER_SIZE
Definition: multisend.cpp:101
void nrn_multisend_setup()
Definition: multisend.cpp:617
static Multisend_ReceiveBuffer * multisend_receive_buffer[2]
Definition: multisend.cpp:156
s
Definition: multisend.cpp:521
void nrnmpi_multisend_comm()
#define PHASE2BUFFER_MASK
Definition: multisend.cpp:91
static void nrn_multisend_init()
Definition: multisend.cpp:375
int nrnmpi_multisend_single_advance(NRNMPI_Spike *)
static void nrn_multisend_cleanup()
Definition: multisend.cpp:566
#define ENQUEUE
Definition: multisend.cpp:83
static int multisend_advance()
Definition: multisend.cpp:389
static int next_rbuf
Definition: multisend.cpp:157
static int max_multisend_targets
Definition: multisend.cpp:317
static unsigned long enq2_enqueue_time_
Definition: multisend.cpp:87
int nrnmpi_multisend_conserve(int nsend, int nrecv)
static int current_rbuf
Definition: multisend.cpp:157
static int use_phase2_
Definition: multisend.cpp:132
static int max_ntarget_host
Definition: multisend.cpp:313
static unsigned long enq2_find_time_
Definition: multisend.cpp:86
if(n_multisend_interval==2)
Definition: multisend.cpp:534
multisend_receive_buffer[current_rbuf] phase2_nsend_
Definition: multisend.cpp:523
multisend_receive_buffer[current_rbuf] enqueue()
multisend_receive_buffer[current_rbuf] phase2_nsend_cell_
Definition: multisend.cpp:522
#define PHASE2BUFFER_SIZE
Definition: multisend.cpp:90
static int n_multisend_interval
Definition: netpar.cpp:32
short index
Definition: cabvars.h:11
double spiketime
Definition: nrnmpi.h:18
int gid
Definition: nrnmpi.h:17
Represent main neuron object computed by single thread.
Definition: multicore.h:58
PreSyn * ps
Definition: multisend.cpp:93
double spiketime
Definition: multisend.cpp:94