NEURON
netpar.cpp
Go to the documentation of this file.
1 #include "utils/profile/profiler_interface.h"
2 #include <../../nrnconf.h>
3 #include <InterViews/resource.h>
4 #include <math.h>
5 #include "nrncvode.h"
6 #include "nrniv_mf.h"
7 #include <nrnmpi.h>
8 #include <nrnoc2iv.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <netpar.h>
12 #include <unordered_map>
13 #include <bbs.h>
14 
15 #undef MD
16 #define MD 2147483648.
17 
18 class PreSyn;
19 
20 // hash table where buckets are binary search maps
21 using Gid2PreSyn = std::unordered_map<int, PreSyn*>;
22 
23 #include <errno.h>
24 #include <netcon.h>
25 #include <cvodeobj.h>
26 #include <netcvode.h>
27 #include "ivocvect.h"
28 
29 #include <atomic>
30 #include <vector>
31 
33 
34 #if NRN_MUSIC
35 #include "nrnmusicapi.h"
36 int nrnmusic;
37 #endif
38 
44 static double t_exchange_;
45 static double dt1_; // 1/dt
46 static int localgid_size_;
47 static int ag_send_size_;
48 static int ag_send_nspike_;
49 static int ovfl_capacity_;
50 static int ovfl_;
51 static unsigned char* spfixout_;
52 static unsigned char* spfixin_;
53 static unsigned char* spfixin_ovfl_;
54 static int nout_;
55 static int* nin_;
58 static int icapacity_;
59 static void alloc_space();
60 
62 extern double t, dt;
63 extern int nrn_use_selfqueue_;
64 extern void nrn_pending_selfqueue(double, NrnThread*);
65 extern Object* nrn_sec2cell(Section*);
66 extern void ncs2nrn_integrate(double tstop);
67 int nrnmpi_spike_compress(int nspike, bool gid_compress, int xchng_meth);
69 int nrn_set_timeout(int);
70 void nrnmpi_gid_clear(int);
71 extern void nrn_partrans_clear();
73 extern double nrn_multisend_receive_time(int);
74 typedef void (*PFIO)(int, Object*);
75 extern void nrn_gidout_iter(PFIO);
76 extern Object* nrn_gid2obj(int);
77 extern PreSyn* nrn_gid2presyn(int);
78 extern int nrn_gid_exists(int);
79 extern double nrnmpi_step_wait_; // barrier at beginning of spike exchange.
80 
81 /**
82  * @brief NEURON callback used from CORENEURON to transfer all spike vectors after simulation
83  *
84  * @param spiketvec - vector of spikes
85  * @param spikegidvec - vector of gids
86  * @return 1 if CORENEURON shall drop writing `out.dat`; 0 otherwise
87  */
88 extern "C" int nrnthread_all_spike_vectors_return(std::vector<double>& spiketvec,
89  std::vector<int>& spikegidvec);
90 
91 #if NRNMPI == 0
93  return 0.;
94 }
95 #endif
96 
97 #if NRNMPI
98 extern void nrnmpi_split_clear();
99 #endif
100 extern void nrnmpi_multisplit_clear();
101 
102 static double set_mindelay(double maxdelay);
103 
104 #if NRNMPI
105 
106 #include "../nrnmpi/mpispike.h"
107 
108 void nrn_timeout(int);
109 extern int nrnmpi_int_allmax(int);
110 extern void nrnmpi_int_allgather(int*, int*, int);
111 void nrn2ncs_outputevent(int netcon_output_index, double firetime);
112 bool nrn_use_compress_; // global due to bbsavestate
113 #define use_compress_ nrn_use_compress_
114 
115 #ifdef USENCS
116 extern int ncs_multisend_sending_info(int**);
117 extern int ncs_multisend_target_hosts(int, int**);
118 extern int ncs_multisend_target_info(int**);
119 extern int ncs_multisend_mindelays(int**, double**);
120 
121 // get minimum delays for all presyn objects in gid2in_
122 int ncs_netcon_mindelays(int** hosts, double** delays) {
123  return ncs_multisend_mindelays(hosts, delays);
124 }
125 
126 double ncs_netcon_localmindelay(int srcgid) {
127  PreSyn* ps = gid2out_.at(srcgid);
128  return ps->mindelay();
129 }
130 
131 // get the number of netcons for an object, if it sends here
132 int ncs_netcon_count(int srcgid, bool localNetCons) {
133  const auto& map = localNetCons ? gid2out_ : gid2in_;
134  const auto& iter = map.find(srcgid);
135  PreSyn* ps{iter != map.end() ? iter->second : nullptr};
136 
137  if (!ps) { // no cells on this cpu receive from the given gid
138  fprintf(stderr, "should never happen!\n");
139  return 0;
140  }
141 
142  return ps->dil_.count();
143 }
144 
145 // inject a spike into the appropriate netcon
146 void ncs_netcon_inject(int srcgid, int netconIndex, double spikeTime, bool localNetCons) {
148  const auto& map = localNetCons ? gid2out_ : gid2in_;
149  const auto& iter = map.find(srcgid);
150  PreSyn* ps{iter != map.end() ? iter->second : nullptr};
151  if (!ps) { // no cells on this cpu receive from the given gid
152  return;
153  }
154 
155  // fprintf( stderr, "gid %d index %d!\n", srcgid, netconIndex );
156  NetCon* d = ps->dil_.item(netconIndex);
157  NrnThread* nt = nrn_threads;
158  if (d->active_ && d->target_) {
159  ns->bin_event(spikeTime + d->delay_, d, nt);
160  }
161 }
162 
163 int ncs_gid_receiving_info(int** presyngids) {
164  return ncs_multisend_target_info(presyngids);
165 }
166 
167 // given the gid of a cell, retrieve its target count
168 int ncs_gid_sending_count(int** sendlist2build) {
169  return ncs_multisend_sending_info(sendlist2build);
170 }
171 
172 int ncs_target_hosts(int gid, int** targetnodes) {
173  return ncs_multisend_target_hosts(gid, targetnodes);
174 }
175 
176 #endif
177 
178 // for compressed gid info during spike exchange
179 bool nrn_use_localgid_;
180 void nrn_outputevent(unsigned char localgid, double firetime);
181 static std::vector<std::unique_ptr<Gid2PreSyn>> localmaps_;
182 
183 static int nsend_, nsendmax_, nrecv_, nrecv_useful_;
184 static IvocVect* max_histogram_;
185 
186 static int ocapacity_; // for spikeout_
187 // require it to be smaller than min_interprocessor_delay.
188 static double wt_; // wait time for nrnmpi_spike_exchange
189 static double wt1_; // time to find the PreSyns and send the spikes.
190 static int spfixout_capacity_;
191 static int idxout_;
192 static void nrn_spike_exchange_compressed(NrnThread*);
193 #endif // NRNMPI
194 
195 #if NRNMPI
196 bool use_multisend_; // false: allgather, true: multisend (MPI_ISend)
197 static void nrn_multisend_setup();
198 static void nrn_multisend_init();
199 static void nrn_multisend_receive(NrnThread*);
200 extern void nrn_multisend_send(PreSyn*, double t);
202 #endif
203 
204 static int active_;
205 static double usable_mindelay_;
207 static double mindelay_; // the one actually used. Some of our optional algorithms
208 static double last_maxstep_arg_;
209 static NetParEvent* npe_; // nrn_nthread of them
210 static int n_npe_; // just to compare with nrn_nthread
211 
213  return usable_mindelay_;
214 }
216  return netcon_sym_;
217 }
219  return gid2out_;
220 }
221 
222 #if NRNMPI
223 // for combination of threads and mpi.
224 #if NRN_ENABLE_THREADS
225 static MUTDEC
226 #endif
227 static std::atomic<int> seqcnt_;
228 static NrnThread* last_nt_;
229 #endif
230 
232  wx_ = ws_ = 0.;
233  ithread_ = -1;
234 }
236 void NetParEvent::send(double tt, NetCvode* nc, NrnThread* nt) {
237  nc->event(tt + usable_mindelay_, this, nt);
238 }
239 
240 
241 void NetParEvent::deliver(double tt, NetCvode* nc, NrnThread* nt) {
242  int seq;
243  if (nrn_use_selfqueue_) { // first handle pending flag=1 self events
244  nrn_pending_selfqueue(tt, nt);
245  }
246  // has to be the last event at this time in order to avoid a race
247  // condition with HocEvent that may call things such as pc.barrier
248  // actually allthread HocEvent (cvode.event(tev) and cvode.event(tev,stmt)
249  // will be executed last after a thread join when nrn_allthread_handle
250  // is called.
252  nt->_stop_stepping = 1;
253  nt->_t = tt;
254 #if NRNMPI
255  if (nrnmpi_numprocs > 0) {
256  seq = ++seqcnt_;
257  if (seq == nrn_nthread) {
258  last_nt_ = nt;
259 #if NRNMPI
260  if (use_multisend_) {
262  } else {
263  nrn_spike_exchange(nt);
264  }
265 #else
266  nrn_spike_exchange(nt);
267 #endif
268  wx_ += wt_;
269  ws_ += wt1_;
270  seqcnt_ = 0;
271  }
272  }
273 #endif
274  send(tt, nc, nt);
275 }
276 void NetParEvent::pgvts_deliver(double tt, NetCvode* nc) {
277  assert(nrn_nthread == 1);
278  deliver(tt, nc, nrn_threads);
279 }
280 
281 void NetParEvent::pr(const char* m, double tt, NetCvode* nc) {
282  Printf("%s NetParEvent %d t=%.15g tt-t=%g\n", m, ithread_, tt, tt - nrn_threads[ithread_]._t);
283 }
284 
286  // pr("savestate_save", 0, net_cvode_instance);
287  NetParEvent* npe = new NetParEvent();
288  npe->ithread_ = ithread_;
289  return npe;
290 }
291 
293  int i;
294  char buf[100];
295  nrn_assert(fgets(buf, 100, f));
296  nrn_assert(sscanf(buf, "%d\n", &i) == 1);
297  // printf("NetParEvent::savestate_read %d\n", i);
298  NetParEvent* npe = new NetParEvent();
299  npe->ithread_ = i;
300  return npe;
301 }
302 
304  // pr("savestate_write", 0, net_cvode_instance);
305  fprintf(f, "%d\n%d\n", NetParEventType, ithread_);
306 }
307 
309 #if NRNMPI
310  if (use_compress_) {
311  t_exchange_ = t;
312  }
313 #endif
314  if (ithread_ == 0) {
315  // npe_->pr("savestate_restore", tt, nc);
316  for (int i = 0; i < nrn_nthread; ++i)
317  if (i < n_npe_) {
318  nc->event(tt, npe_ + i, nrn_threads + i);
319  }
320  }
321 }
322 
323 #if NRNMPI
324 inline static void sppk(unsigned char* c, int gid) {
325  for (int i = localgid_size_ - 1; i >= 0; --i) {
326  c[i] = gid & 255;
327  gid >>= 8;
328  }
329 }
330 inline static int spupk(unsigned char* c) {
331  int gid = *c++;
332  for (int i = 1; i < localgid_size_; ++i) {
333  gid <<= 8;
334  gid += *c++;
335  }
336  return gid;
337 }
338 
339 void nrn_outputevent(unsigned char localgid, double firetime) {
340  if (!active_) {
341  return;
342  }
343  MUTLOCK
344  nout_++;
345  int i = idxout_;
346  idxout_ += 2;
347  if (idxout_ >= spfixout_capacity_) {
348  spfixout_capacity_ *= 2;
349  spfixout_ = (unsigned char*) hoc_Erealloc(spfixout_,
350  spfixout_capacity_ * sizeof(unsigned char));
351  hoc_malchk();
352  }
353  spfixout_[i++] = (unsigned char) ((firetime - t_exchange_) * dt1_ + .5);
354  spfixout_[i] = localgid;
355  // printf("%d idx=%d lgid=%d firetime=%g t_exchange_=%g [0]=%d [1]=%d\n", nrnmpi_myid, i,
356  // (int)localgid, firetime, t_exchange_, (int)spfixout_[i-1], (int)spfixout_[i]);
357  MUTUNLOCK
358 }
359 
360 #ifndef USENCS
361 void nrn2ncs_outputevent(int gid, double firetime) {
362  if (!active_) {
363  return;
364  }
365  MUTLOCK
366  if (use_compress_) {
367  nout_++;
368  int i = idxout_;
369  idxout_ += 1 + localgid_size_;
370  if (idxout_ >= spfixout_capacity_) {
371  spfixout_capacity_ *= 2;
372  spfixout_ = (unsigned char*) hoc_Erealloc(spfixout_,
373  spfixout_capacity_ * sizeof(unsigned char));
374  hoc_malchk();
375  }
376  // printf("%d nrnncs_outputevent %d %.20g %.20g %d\n", nrnmpi_myid, gid, firetime,
377  // t_exchange_, (int)((unsigned char)((firetime - t_exchange_)*dt1_ + .5)));
378  spfixout_[i++] = (unsigned char) ((firetime - t_exchange_) * dt1_ + .5);
379  // printf("%d idx=%d firetime=%g t_exchange_=%g spfixout=%d\n", nrnmpi_myid, i, firetime,
380  // t_exchange_, (int)spfixout_[i-1]);
381  sppk(spfixout_ + i, gid);
382  // printf("%d idx=%d gid=%d spupk=%d\n", nrnmpi_myid, i, gid, spupk(spfixout_+i));
383  } else {
384 #if nrn_spikebuf_size == 0
385  int i = nout_++;
386  if (i >= ocapacity_) {
387  ocapacity_ *= 2;
388  spikeout_ = (NRNMPI_Spike*) hoc_Erealloc(spikeout_, ocapacity_ * sizeof(NRNMPI_Spike));
389  hoc_malchk();
390  }
391  // printf("%d cell %d in slot %d fired at %g\n", nrnmpi_myid, gid, i, firetime);
392  spikeout_[i].gid = gid;
393  spikeout_[i].spiketime = firetime;
394 #else
395  int i = nout_++;
396  if (i >= nrn_spikebuf_size) {
397  i -= nrn_spikebuf_size;
398  if (i >= ocapacity_) {
399  ocapacity_ *= 2;
401  ocapacity_ * sizeof(NRNMPI_Spike));
402  hoc_malchk();
403  }
404  spikeout_[i].gid = gid;
405  spikeout_[i].spiketime = firetime;
406  } else {
407  spbufout_->gid[i] = gid;
408  spbufout_->spiketime[i] = firetime;
409  }
410 #endif
411  }
412  MUTUNLOCK
413  // printf("%d cell %d in slot %d fired at %g\n", nrnmpi_myid, gid, i, firetime);
414 }
415 #endif // USENCS
416 #endif // NRNMPI
417 
418 static int nrn_need_npe() {
419  int b = 0;
420  if (active_) {
421  b = 1;
422  }
423  if (nrn_use_selfqueue_) {
424  b = 1;
425  }
426  if (nrn_nthread > 1) {
427  b = 1;
428  }
429  if (b) {
430  if (last_maxstep_arg_ == 0) {
431  last_maxstep_arg_ = 100.;
432  }
434  } else {
435  if (npe_) {
436  delete[] npe_;
437  npe_ = NULL;
438  n_npe_ = 0;
439  }
440  }
441  return b;
442 }
443 
444 static void calc_actual_mindelay() {
445  // reasons why mindelay_ can be smaller than min_interprocessor_delay
446  // are use_multisend_
447 #if NRNMPI
448  if (use_multisend_ && n_multisend_interval == 2) {
450  } else {
452  }
453 #endif
454 }
455 
456 #if NRNMPI
457 #include "multisend.cpp"
458 #else
459 #define TBUFSIZE 0
460 #define TBUF /**/
461 #endif
462 
464  if (nrnmpi_step_wait_ >= 0.0) {
465  nrnmpi_step_wait_ = 0.0;
466  }
467 #ifdef USENCS
469  return;
470 #endif
471  // printf("nrn_spike_exchange_init\n");
472  if (!nrn_need_npe()) {
473  return;
474  }
475  // if (!active_ && !nrn_use_selfqueue_) { return; }
476  alloc_space();
477  // printf("nrnmpi_use=%d active=%d\n", nrnmpi_use, active_);
480  if (cvode_active_ == 0 && nrn_nthread > 1) {
481  usable_mindelay_ -= dt;
482  }
483  if ((usable_mindelay_ < 1e-9) || (cvode_active_ == 0 && usable_mindelay_ < dt)) {
484  if (nrnmpi_myid == 0) {
485  hoc_execerror("usable mindelay is 0", "(or less than dt for fixed step method)");
486  } else {
487  return;
488  }
489  }
490 
491 #if NRN_MUSIC
492  if (nrnmusic) {
494  }
495 #endif
496 
497 #if TBUFSIZE
498  itbuf_ = 0;
499 #endif
500 
501 #if NRNMPI
502  if (use_multisend_) {
504  }
505 #endif
506 
507  if (n_npe_ != nrn_nthread) {
508  if (npe_) {
509  delete[] npe_;
510  }
513  }
514  for (int i = 0; i < nrn_nthread; ++i) {
515  npe_[i].ithread_ = i;
516  npe_[i].wx_ = 0.;
517  npe_[i].ws_ = 0.;
519  }
520 #if NRNMPI
521  if (use_compress_) {
522  idxout_ = 2;
523  t_exchange_ = t;
524  dt1_ = 1. / dt;
525  usable_mindelay_ = floor(mindelay_ * dt1_ + 1e-9) * dt;
527  } else {
528 #if nrn_spikebuf_size > 0
529  if (spbufout_) {
530  spbufout_->nspike = 0;
531  }
532 #endif
533  }
534  nout_ = 0;
535  nsend_ = nsendmax_ = nrecv_ = nrecv_useful_ = 0;
536  if (nrnmpi_numprocs > 0) {
537  if (nrn_nthread > 0) {
538 #if NRN_ENABLE_THREADS
539  if (!mut_) {
540  MUTCONSTRUCT(1)
541  }
542 #endif
543  } else {
545  }
546  }
547 #endif // NRNMPI
548  // if (nrnmpi_myid == 0){printf("usable_mindelay_ = %g\n", usable_mindelay_);}
549 }
550 
551 #if !NRNMPI
553 #else
554 void nrn_spike_exchange(NrnThread* nt) {
555  nrn::Instrumentor::phase p_spike_exchange("spike-exchange");
556  if (!active_) {
557  return;
558  }
559 #if NRNMPI
560  if (use_multisend_) {
562  return;
563  }
564 #endif
565  if (use_compress_) {
566  nrn_spike_exchange_compressed(nt);
567  return;
568  }
569  TBUF
570 #if TBUFSIZE
571  nrnmpi_barrier();
572 #endif
573  TBUF
574  double wt;
575  int i, n;
576  nsend_ += nout_;
577  if (nsendmax_ < nout_) {
578  nsendmax_ = nout_;
579  }
580 #if nrn_spikebuf_size > 0
581  spbufout_->nspike = nout_;
582 #endif
583  wt = nrnmpi_wtime();
584  if (nrnmpi_step_wait_ >= 0.) {
585  nrnmpi_barrier();
587  }
588  n = nrnmpi_spike_exchange(&ovfl_, &nout_, nin_, spikeout_, &spikein_, &icapacity_);
589  wt_ = nrnmpi_wtime() - wt;
590  wt = nrnmpi_wtime();
591  TBUF
592 #if TBUFSIZE
593  tbuf_[itbuf_++] = (unsigned long) nout_;
594  tbuf_[itbuf_++] = (unsigned long) n;
595 #endif
596 
597  errno = 0;
598  // if (n > 0) {
599  // printf("%d nrn_spike_exchange sent %d received %d\n", nrnmpi_myid, nout_, n);
600  //}
601  nout_ = 0;
602  if (n == 0) {
603  if (max_histogram_) {
604  vector_vec(max_histogram_)[0] += 1.;
605  }
606  TBUF
607  return;
608  }
609  nrecv_ += n;
610  if (max_histogram_) {
611  int mx = 0;
612  if (n > 0) {
613  for (i = nrnmpi_numprocs - 1; i >= 0; --i) {
614 #if nrn_spikebuf_size == 0
615  if (mx < nin_[i]) {
616  mx = nin_[i];
617  }
618 #else
619  if (mx < spbufin_[i].nspike) {
620  mx = spbufin_[i].nspike;
621  }
622 #endif
623  }
624  }
625  int ms = vector_capacity(max_histogram_) - 1;
626  mx = (mx < ms) ? mx : ms;
627  vector_vec(max_histogram_)[mx] += 1.;
628  }
629 #if nrn_spikebuf_size > 0
630  for (i = 0; i < nrnmpi_numprocs; ++i) {
631  int j;
632  int nn = spbufin_[i].nspike;
633  if (nn > nrn_spikebuf_size) {
634  nn = nrn_spikebuf_size;
635  }
636  for (j = 0; j < nn; ++j) {
637  auto iter = gid2in_.find(spbufin_[i].gid[j]);
638  if (iter != gid2in_.end()) {
639  PreSyn* ps = iter->second;
640  ps->send(spbufin_[i].spiketime[j], net_cvode_instance, nt);
641  ++nrecv_useful_;
642  }
643  }
644  }
645  n = ovfl_;
646 #endif // nrn_spikebuf_size > 0
647  for (i = 0; i < n; ++i) {
648  auto iter = gid2in_.find(spikein_[i].gid);
649  if (iter != gid2in_.end()) {
650  PreSyn* ps = iter->second;
651  ps->send(spikein_[i].spiketime, net_cvode_instance, nt);
652  ++nrecv_useful_;
653  }
654  }
655  wt1_ = nrnmpi_wtime() - wt;
656  TBUF
657 }
658 
659 void nrn_spike_exchange_compressed(NrnThread* nt) {
660  if (!active_) {
661  return;
662  }
663  TBUF
664 #if TBUFSIZE
665  nrnmpi_barrier();
666 #endif
667  TBUF
669  double wt;
670  int i, n, idx;
671  nsend_ += nout_;
672  if (nsendmax_ < nout_) {
673  nsendmax_ = nout_;
674  }
675  assert(nout_ < 0x10000);
676  spfixout_[1] = (unsigned char) (nout_ & 0xff);
677  spfixout_[0] = (unsigned char) (nout_ >> 8);
678 
679  wt = nrnmpi_wtime();
680  if (nrnmpi_step_wait_ >= 0.) {
681  nrnmpi_barrier();
683  }
684  n = nrnmpi_spike_exchange_compressed(localgid_size_,
688  &ovfl_,
689  spfixout_,
690  spfixin_,
691  &spfixin_ovfl_,
692  nin_);
693  wt_ = nrnmpi_wtime() - wt;
694  wt = nrnmpi_wtime();
695  TBUF
696 #if TBUFSIZE
697  tbuf_[itbuf_++] = (unsigned long) nout_;
698  tbuf_[itbuf_++] = (unsigned long) n;
699 #endif
700  errno = 0;
701  // if (n > 0) {
702  // printf("%d nrn_spike_exchange sent %d received %d\n", nrnmpi_myid, nout_, n);
703  //}
704  nout_ = 0;
705  idxout_ = 2;
706  if (n == 0) {
707  if (max_histogram_) {
708  vector_vec(max_histogram_)[0] += 1.;
709  }
710  t_exchange_ = nt->_t;
711  TBUF
712  return;
713  }
714  nrecv_ += n;
715  if (max_histogram_) {
716  int mx = 0;
717  if (n > 0) {
718  for (i = nrnmpi_numprocs - 1; i >= 0; --i) {
719  if (mx < nin_[i]) {
720  mx = nin_[i];
721  }
722  }
723  }
724  int ms = vector_capacity(max_histogram_) - 1;
725  mx = (mx < ms) ? mx : ms;
726  vector_vec(max_histogram_)[mx] += 1.;
727  }
728  if (nrn_use_localgid_) {
729  int idxov = 0;
730  for (i = 0; i < nrnmpi_numprocs; ++i) {
731  int j, nnn;
732  int nn = nin_[i];
733  if (nn) {
734  if (i == nrnmpi_myid) { // skip but may need to increment idxov.
735  if (nn > ag_send_nspike_) {
736  idxov += (nn - ag_send_nspike_) * (1 + localgid_size_);
737  }
738  continue;
739  }
740  Gid2PreSyn* gps = localmaps_[i].get();
741  if (nn > ag_send_nspike_) {
742  nnn = ag_send_nspike_;
743  } else {
744  nnn = nn;
745  }
746  idx = 2 + i * ag_send_size_;
747  for (j = 0; j < nnn; ++j) {
748  // order is (firetime,gid) pairs.
749  double firetime = spfixin_[idx++] * dt + t_exchange_;
750  int lgid = (int) spfixin_[idx];
751  idx += localgid_size_;
752  auto iter = gps->find(lgid);
753  if (iter != gps->end()) {
754  PreSyn* ps = iter->second;
755  ps->send(firetime + 1e-10, net_cvode_instance, nt);
756  ++nrecv_useful_;
757  }
758  }
759  for (; j < nn; ++j) {
760  double firetime = spfixin_ovfl_[idxov++] * dt + t_exchange_;
761  int lgid = (int) spfixin_ovfl_[idxov];
762  idxov += localgid_size_;
763  auto iter = gps->find(lgid);
764  if (iter != gps->end()) {
765  PreSyn* ps = iter->second;
766  ps->send(firetime + 1e-10, net_cvode_instance, nt);
767  ++nrecv_useful_;
768  }
769  }
770  }
771  }
772  } else {
773  for (i = 0; i < nrnmpi_numprocs; ++i) {
774  int j;
775  int nn = nin_[i];
776  if (nn > ag_send_nspike_) {
777  nn = ag_send_nspike_;
778  }
779  idx = 2 + i * ag_send_size_;
780  for (j = 0; j < nn; ++j) {
781  // order is (firetime,gid) pairs.
782  double firetime = spfixin_[idx++] * dt + t_exchange_;
783  int gid = spupk(spfixin_ + idx);
784  idx += localgid_size_;
785  auto iter = gid2in_.find(gid);
786  if (iter != gid2in_.end()) {
787  PreSyn* ps = iter->second;
788  ps->send(firetime + 1e-10, net_cvode_instance, nt);
789  ++nrecv_useful_;
790  }
791  }
792  }
793  n = ovfl_;
794  idx = 0;
795  for (i = 0; i < n; ++i) {
796  double firetime = spfixin_ovfl_[idx++] * dt + t_exchange_;
797  int gid = spupk(spfixin_ovfl_ + idx);
798  idx += localgid_size_;
799  auto iter = gid2in_.find(gid);
800  if (iter != gid2in_.end()) {
801  PreSyn* ps = iter->second;
802  ps->send(firetime + 1e-10, net_cvode_instance, nt);
803  ++nrecv_useful_;
804  }
805  }
806  }
807  t_exchange_ = nt->_t;
808  wt1_ = nrnmpi_wtime() - wt;
809  TBUF
810 }
811 
812 static void mk_localgid_rep() {
813  // how many gids are there on this machine
814  // and can they be compressed into one byte
815  int ngid = 0;
816  for (const auto& iter: gid2out_) {
817  PreSyn* ps = iter.second;
818  if (ps->output_index_ >= 0) {
819  ++ngid;
820  }
821  }
822  int ngidmax = nrnmpi_int_allmax(ngid);
823  if (ngidmax > 256) {
824  // do not compress
825  return;
826  }
827  localgid_size_ = sizeof(unsigned char);
828  nrn_use_localgid_ = true;
829 
830  // allocate Allgather receive buffer (send is the nrnmpi_myid one)
831  int* rbuf = new int[nrnmpi_numprocs * (ngidmax + 1)];
832  int* sbuf = new int[ngidmax + 1];
833 
834  sbuf[0] = ngid;
835  ++sbuf;
836  ngid = 0;
837  // define the local gid and fill with the gids on this machine
838  for (const auto& iter: gid2out_) {
839  PreSyn* ps = iter.second;
840  if (ps->output_index_ >= 0) {
841  ps->localgid_ = (unsigned char) ngid;
842  sbuf[ngid] = ps->output_index_;
843  ++ngid;
844  }
845  }
846  --sbuf;
847 
848  // exchange everything
849  nrnmpi_int_allgather(sbuf, rbuf, ngidmax + 1);
850  delete[] sbuf;
851  errno = 0;
852 
853  // create the maps
854  // there is a lot of potential for efficiency here. i.e. use of
855  // perfect hash functions, or even simple Vectors.
856  localmaps_.clear();
857  localmaps_.resize(nrnmpi_numprocs);
858 
859  for (int i = 0; i < nrnmpi_numprocs; ++i)
860  if (i != nrnmpi_myid) {
861  localmaps_[i].reset(new Gid2PreSyn());
862  }
863 
864  // fill in the maps
865  for (int i = 0; i < nrnmpi_numprocs; ++i)
866  if (i != nrnmpi_myid) {
867  sbuf = rbuf + i * (ngidmax + 1);
868  ngid = *(sbuf++);
869  for (int k = 0; k < ngid; ++k) {
870  auto iter = gid2in_.find(int(sbuf[k]));
871  if (iter != gid2in_.end()) {
872  PreSyn* ps = iter->second;
873  (*localmaps_[i])[k] = ps;
874  }
875  }
876  }
877 
878  // cleanup
879  delete[] rbuf;
880 }
881 
882 #endif // NRNMPI
883 
884 // may stimulate a gid for a cell not owned by this cpu. This allows
885 // us to run single cells or subnets and stimulate exactly according to
886 // their input in a full parallel net simulation.
887 // For some purposes, it may be useful to simulate a spike from a
888 // cell that does exist and would normally send its own spike, eg.
889 // recurrent stimulation. This can be useful in debugging where the
890 // spike raster comes from another implementation and one wants to
891 // get complete control of all input spikes without the confounding
892 // effects of output spikes from the simulated cells. In this case
893 // set the third arg to 1 and set the output cell thresholds very
894 // high so that they do not themselves generate spikes.
895 // The remaining possibility is fake_out=2 which only does a send for
896 // the gids owned by this cpu. This, followed by a nrn_spike_exchange(),
897 // ensures that all the target cells, regardless of what rank they are on
898 // will get the spike delivered and nobody gets it twice.
899 
900 void nrn_fake_fire(int gid, double spiketime, int fake_out) {
901  PreSyn* ps{nullptr};
902  if (fake_out < 2) {
903  auto iter = gid2in_.find(gid);
904  if (iter != gid2in_.end()) {
905  ps = iter->second;
906  // printf("nrn_fake_fire %d %g\n", gid, spiketime);
907  ps->send(spiketime, net_cvode_instance, nrn_threads);
908 #if NRNMPI
909  ++nrecv_useful_;
910 #endif
911  }
912  } else if (fake_out && !ps) {
913  auto iter = gid2out_.find(gid);
914  if (iter != gid2out_.end()) {
915  ps = iter->second;
916  // printf("nrn_fake_fire fake_out %d %g\n", gid, spiketime);
917  ps->send(spiketime, net_cvode_instance, nrn_threads);
918 #if NRNMPI
919  ++nrecv_useful_;
920 #endif
921  }
922  }
923 }
924 
925 static void alloc_space() {
926  if (!netcon_sym_) {
927  netcon_sym_ = hoc_lookup("NetCon");
928 #if NRNMPI
929  ocapacity_ = 100;
930  spikeout_ = (NRNMPI_Spike*) hoc_Emalloc(ocapacity_ * sizeof(NRNMPI_Spike));
931  hoc_malchk();
932  icapacity_ = 100;
934  hoc_malchk();
935  nin_ = (int*) hoc_Emalloc(nrnmpi_numprocs * sizeof(int));
936  hoc_malchk();
937 #if nrn_spikebuf_size > 0
938  spbufout_ = (NRNMPI_Spikebuf*) hoc_Emalloc(sizeof(NRNMPI_Spikebuf));
939  hoc_malchk();
940  spbufin_ = (NRNMPI_Spikebuf*) hoc_Emalloc(nrnmpi_numprocs * sizeof(NRNMPI_Spikebuf));
941  hoc_malchk();
942 #endif
943 #endif
944  }
945 }
946 
947 void BBS::set_gid2node(int gid, int nid) {
948  alloc_space();
949 #if NRNMPI
950  if (nid == nrnmpi_myid) {
951 #else
952  {
953 #endif
954  // printf("gid %d defined on %d\n", gid, nrnmpi_myid);
955  if (gid2in_.find(gid) != gid2in_.end()) {
957  "gid=%d already exists as an input port. Setup all the output ports on this "
958  "process before using them as input ports.",
959  gid);
960  }
961  if (gid2out_.find(gid) != gid2out_.end()) {
962  hoc_execerr_ext("gid=%d already exists on this process as an output port", gid);
963  }
964  gid2out_[gid] = nullptr;
965  }
966 }
967 
968 static int gid_donot_remove = 0; // avoid gid2in_, gid2out removal when iterating
969 
971 #if NRNMPI
973 #endif
974  if (gid_donot_remove) {
975  return;
976  }
977  if (ps->output_index_ >= 0) {
978  gid2out_.erase(ps->output_index_);
979  ps->output_index_ = -1;
980  ps->gid_ = -1;
981  }
982  if (ps->gid_ >= 0) {
983  gid2in_.erase(ps->gid_);
984  ps->gid_ = -1;
985  }
986 }
987 
988 void nrnmpi_gid_clear(int arg) {
989  if (arg == 0 || arg == 3 || arg == 4) {
991 #if NRNMPI
992  nrnmpi_split_clear();
993 #endif
994  }
995  if (arg == 0 || arg == 2 || arg == 4) {
997  }
998  if (arg == 2 || arg == 3) {
999  return;
1000  }
1001  gid_donot_remove = 1;
1002  for (const auto& iter: gid2out_) {
1003  PreSyn* ps = iter.second;
1004  if (ps && gid2in_.find(ps->gid_) == gid2in_.end()) {
1005  if (arg == 4) {
1006  delete ps;
1007  } else {
1008 #if NRNMPI
1010 #endif
1011  ps->gid_ = -1;
1012  ps->output_index_ = -1;
1013  if (ps->dil_.empty()) {
1014  delete ps;
1015  }
1016  }
1017  }
1018  }
1019  for (const auto& iter: gid2in_) {
1020  PreSyn* ps = iter.second;
1021  if (arg == 4) {
1022  delete ps;
1023  } else {
1024 #if NRNMPI
1026 #endif
1027  ps->gid_ = -1;
1028  ps->output_index_ = -1;
1029  if (ps->dil_.empty()) {
1030  delete ps;
1031  }
1032  }
1033  }
1034  gid_donot_remove = 0;
1035  gid2in_.clear();
1036  gid2out_.clear();
1037 }
1038 
1039 int nrn_gid_exists(int gid) {
1040  alloc_space();
1041  auto iter = gid2out_.find(gid);
1042  if (iter != gid2out_.end()) {
1043  PreSyn* ps = iter->second;
1044  // printf("%d gid %d exists\n", nrnmpi_myid, gid);
1045  if (ps) {
1046  return (ps->output_index_ >= 0 ? 3 : 2);
1047  } else {
1048  return 1;
1049  }
1050  }
1051  return 0;
1052 }
1053 int BBS::gid_exists(int gid) {
1054  return nrn_gid_exists(gid);
1055 }
1056 
1057 double BBS::threshold() {
1058  int gid = int(chkarg(1, 0., MD));
1059  auto iter = gid2out_.find(gid);
1060  if (iter == gid2out_.end() || iter->second == NULL) {
1061  hoc_execerror("gid not associated with spike generation location", 0);
1062  }
1063  PreSyn* ps = iter->second;
1064  if (ifarg(2)) {
1065  ps->threshold_ = *getarg(2);
1066  }
1067  return ps->threshold_;
1068 }
1069 
1070 void BBS::cell() {
1071  int gid = int(chkarg(1, 0., MD));
1072  alloc_space();
1073  if (gid2in_.find(gid) != gid2in_.end()) {
1075  "gid=%d is in the input list. Must register with pc.set_gid2node prior to connecting",
1076  gid);
1077  }
1078  if (gid2out_.find(gid) == gid2out_.end()) {
1079  hoc_execerr_ext("gid=%d has not been set on rank %d", gid, nrnmpi_myid);
1080  }
1081  Object* ob = *hoc_objgetarg(2);
1082  if (!ob || ob->ctemplate != netcon_sym_->u.ctemplate) {
1083  check_obj_type(ob, "NetCon");
1084  }
1085  NetCon* nc = (NetCon*) ob->u.this_pointer;
1086  PreSyn* ps = nc->src_;
1087  if (!ps) {
1088  hoc_execerr_ext("pc.cell second arg, %s, has no source", hoc_object_name(ob));
1089  }
1090  if (ps->gid_ >= 0 && ps->gid_ != gid) {
1091  hoc_execerr_ext("Can't associate gid %d. PreSyn already associated with gid %d.",
1092  gid,
1093  ps->gid_);
1094  }
1095  gid2out_[gid] = ps;
1096  ps->gid_ = gid;
1097  if (ifarg(3) && !chkarg(3, 0., 1.)) {
1098  ps->output_index_ = -2; // prevents destruction of PreSyn
1099  } else {
1100  ps->output_index_ = gid;
1101  }
1102 }
1103 
1104 void BBS::outputcell(int gid) {
1105  auto iter = gid2out_.find(gid);
1106  nrn_assert(iter != gid2out_.end());
1107  PreSyn* ps = iter->second;
1108  assert(ps);
1109  ps->output_index_ = gid;
1110  ps->gid_ = gid;
1111 }
1112 
1113 void BBS::spike_record(int gid, IvocVect* spikevec, IvocVect* gidvec) {
1114  if (gid >= 0) {
1115  all_spiketvec = NULL, all_spikegidvec = NULL; // invalidate global spike vectors
1116 
1117  auto iter = gid2out_.find(gid);
1118  nrn_assert(iter != gid2out_.end());
1119  PreSyn* ps = iter->second;
1120  assert(ps);
1121  ps->record(spikevec, gidvec, gid);
1122  } else { // record all output spikes.
1123  all_spiketvec = spikevec,
1124  all_spikegidvec = gidvec; // track global spike vectors (optimisation)
1125  for (const auto& iter: gid2out_) {
1126  PreSyn* ps = iter.second;
1127  if (ps->output_index_ >= 0) {
1129  }
1130  }
1131  }
1132 }
1133 
1134 void BBS::spike_record(IvocVect* gids, IvocVect* spikevec, IvocVect* gidvec) {
1135  int sz = vector_capacity(gids);
1136  // invalidate global spike vectors
1137  all_spiketvec = nullptr;
1138  all_spikegidvec = nullptr;
1139  double* pd = vector_vec(gids);
1140  for (int i = 0; i < sz; ++i) {
1141  int gid = int(pd[i]);
1142  auto iter = gid2out_.find(gid);
1143  nrn_assert(iter != gid2out_.end());
1144  PreSyn* ps = iter->second;
1145  assert(ps);
1146  ps->record(spikevec, gidvec, gid);
1147  }
1148 }
1149 
1150 static Object* gid2obj_(int gid) {
1151  Object* cell = 0;
1152  // printf("%d gid2obj gid=%d\n", nrnmpi_myid, gid);
1153  auto iter = gid2out_.find(gid);
1154  nrn_assert(iter != gid2out_.end());
1155  PreSyn* ps = iter->second;
1156  // printf(" found\n");
1157  assert(ps);
1158  cell = ps->ssrc_ ? nrn_sec2cell(ps->ssrc_) : ps->osrc_;
1159  // printf(" return %s\n", hoc_object_name(cell));
1160  return cell;
1161 }
1162 
1163 Object** BBS::gid2obj(int gid) {
1164  return hoc_temp_objptr(gid2obj_(gid));
1165 }
1166 
1167 Object** BBS::gid2cell(int gid) {
1168  Object* cell = 0;
1169  // printf("%d gid2obj gid=%d\n", nrnmpi_myid, gid);
1170  auto iter = gid2out_.find(gid);
1171  nrn_assert(iter != gid2out_.end());
1172  PreSyn* ps = iter->second;
1173  // printf(" found\n");
1174  assert(ps);
1175  if (ps->ssrc_) {
1176  cell = nrn_sec2cell(ps->ssrc_);
1177  } else {
1178  cell = ps->osrc_;
1179  // but if it is a POINT_PROCESS in a section
1180  // that is inside an object ... (probably has a WATCH statement)
1181  Section* sec = ob2pntproc(cell)->sec;
1182  Object* c2;
1183  if (sec && (c2 = nrn_sec2cell(sec))) {
1184  if (c2) {
1185  cell = c2;
1186  }
1187  }
1188  }
1189  // printf(" return %s\n", hoc_object_name(cell));
1190  return hoc_temp_objptr(cell);
1191 }
1192 
1194  Object* target = *hoc_objgetarg(2);
1195  if (!is_point_process(target)) {
1196  hoc_execerror("arg 2 must be a point process", 0);
1197  }
1198  alloc_space();
1199  PreSyn* ps;
1200  auto iter_out = gid2out_.find(gid);
1201  if (iter_out != gid2out_.end()) {
1202  // the gid is owned by this machine so connect directly
1203  ps = iter_out->second;
1204  if (!ps) {
1205  hoc_execerr_ext("gid %d owned by %d but no associated cell", gid, nrnmpi_myid);
1206  }
1207  } else {
1208  auto iter_in = gid2in_.find(gid);
1209  if (iter_in != gid2in_.end()) {
1210  // the gid stub already exists
1211  ps = iter_in->second;
1212  // printf("%d connect %s from already existing %d\n", nrnmpi_myid,
1213  // hoc_object_name(target), gid);
1214  } else {
1215  // printf("%d connect %s from new PreSyn for %d\n", nrnmpi_myid,
1216  // hoc_object_name(target), gid);
1217  ps = new PreSyn({}, nullptr, nullptr);
1219  gid2in_[gid] = ps;
1220  ps->gid_ = gid;
1221  }
1222  }
1223  NetCon* nc;
1224  Object** po;
1225  if (ifarg(3)) {
1226  po = hoc_objgetarg(3);
1227  if (!*po || (*po)->ctemplate != netcon_sym_->u.ctemplate) {
1228  check_obj_type(*po, "NetCon");
1229  }
1230  nc = (NetCon*) ((*po)->u.this_pointer);
1231  if (nc->target_ != ob2pntproc(target)) {
1232  hoc_execerror("target is different from 3rd arg NetCon target", 0);
1233  }
1234  nc->replace_src(ps);
1235  } else {
1236  nc = new NetCon(ps, target);
1237  po = hoc_temp_objvar(netcon_sym_, nc);
1238  nc->obj_ = *po;
1239  }
1240  return po;
1241 }
1242 
1243 static int timeout_ = 20;
1244 int nrn_set_timeout(int timeout) {
1245  int tt;
1246  tt = timeout_;
1247  timeout_ = timeout;
1248  return tt;
1249 }
1250 
1251 void BBS::netpar_solve(double tstop) {
1252  // temporary check to be eventually replaced by verify_structure()
1253  if (tree_changed) {
1254  setup_topology();
1255  }
1256  if (v_structure_change) {
1257  v_setup_vectors();
1258  }
1259  if (diam_changed) {
1260  recalc_diam();
1261  }
1262  // if cvode_active, and anything at all has changed, should call re_init
1263 
1264 #if NRNMPI
1265  double mt, md;
1266  tstopunset;
1267  if (cvode_active_) {
1268  mt = 1e-9;
1269  md = mindelay_;
1270  } else {
1271  mt = dt;
1272  md = mindelay_ - 1e-10;
1273  }
1274  if (md < mt) {
1275  if (nrnmpi_myid == 0) {
1276  hoc_execerror("mindelay is 0", "(or less than dt for fixed step method)");
1277  } else {
1278  return;
1279  }
1280  }
1281  double wt;
1282 
1283  nrnmpi_barrier(); // make sure all integrations start about the same
1284  nrn_timeout(timeout_); // time to avoid spurious timeouts while waiting
1285  // at the next MPI_collective.
1286  wt = nrnmpi_wtime();
1287  if (cvode_active_) {
1288  ncs2nrn_integrate(tstop);
1289  } else {
1290  ncs2nrn_integrate(tstop * (1. + 1e-11));
1291  }
1292  impl_->integ_time_ += nrnmpi_wtime() - wt;
1293  impl_->integ_time_ -= (npe_ ? (npe_[0].wx_ + npe_[0].ws_) : 0.);
1294 #if NRNMPI
1295  if (use_multisend_) {
1296  for (int i = 0; i < n_multisend_interval; ++i) {
1298  }
1299  } else {
1301  }
1302 #else
1304 #endif
1305  nrn_timeout(0);
1306  impl_->wait_time_ += wt_;
1307  impl_->send_time_ += wt1_;
1308  if (npe_) {
1309  impl_->wait_time_ += npe_[0].wx_;
1310  impl_->send_time_ += npe_[0].ws_;
1311  npe_[0].wx_ = npe_[0].ws_ = 0.;
1312  };
1313 // printf("%d netpar_solve exit t=%g tstop=%g mindelay_=%g\n",nrnmpi_myid, t, tstop, mindelay_);
1314 #else // not NRNMPI
1315  ncs2nrn_integrate(tstop);
1316 #endif
1317  tstopunset;
1318 }
1319 
1320 int nrnthread_all_spike_vectors_return(std::vector<double>& spiketvec,
1321  std::vector<int>& spikegidvec) {
1322  assert(spiketvec.size() == spikegidvec.size());
1323  if (spiketvec.size()) {
1324  /* Optimisation:
1325  * if all_spiketvec and all_spikegidvec are set (pc.spike_record with gid=-1 was called)
1326  * and they are still reference counted (obj_->refcount), then copy over incoming vectors.
1327  */
1328  if (all_spiketvec != NULL && all_spiketvec->obj_ != NULL &&
1331  all_spiketvec->buffer_size(spiketvec.size() + all_spiketvec->size());
1332  all_spikegidvec->buffer_size(spikegidvec.size() + all_spikegidvec->size());
1333  all_spiketvec->vec().insert(all_spiketvec->end(), spiketvec.begin(), spiketvec.end());
1334  all_spikegidvec->vec().insert(all_spikegidvec->end(),
1335  spikegidvec.begin(),
1336  spikegidvec.end());
1337 
1338  } else { // different underlying vectors for PreSyns
1339  for (size_t i = 0; i < spikegidvec.size(); ++i) {
1340  auto iter = gid2out_.find(spikegidvec[i]);
1341  if (iter != gid2out_.end()) {
1342  PreSyn* ps = iter->second;
1343  ps->record(spiketvec[i]);
1344  }
1345  }
1346  }
1347  }
1348  return 1;
1349 }
1350 
1351 
1352 static double set_mindelay(double maxdelay) {
1353  double mindelay = maxdelay;
1354  last_maxstep_arg_ = maxdelay;
1356  if (net_cvode_instance->psl_)
1357  for (PreSyn* ps: *net_cvode_instance->psl_) {
1358  double md = ps->mindelay();
1359  if (mindelay > md) {
1360  mindelay = md;
1361  }
1362  }
1363  }
1364 #if NRNMPI
1365  else {
1366  for (const auto& iter: gid2in_) {
1367  PreSyn* ps = iter.second;
1368  double md = ps->mindelay();
1369  if (mindelay > md) {
1370  mindelay = md;
1371  }
1372  }
1373  }
1374  if (nrnmpi_use) {
1375  active_ = 1;
1376  }
1377  if (use_compress_) {
1378  if (mindelay / dt > 255) {
1379  mindelay = 255 * dt;
1380  }
1381  }
1382 
1383  // printf("%d netpar_mindelay local %g now calling nrnmpi_mindelay\n", nrnmpi_myid, mindelay);
1384  // double st = time();
1385  mindelay_ = nrnmpi_mindelay(mindelay);
1387  // add_wait_time(st);
1388  // printf("%d local min=%g global min=%g\n", nrnmpi_myid, mindelay, mindelay_);
1389  if (mindelay_ < 1e-9 && nrn_use_selfqueue_) {
1390  nrn_use_selfqueue_ = 0;
1391  double od = mindelay_;
1392  mindelay = set_mindelay(maxdelay);
1393  if (nrnmpi_myid == 0) {
1394  Printf(
1395  "Notice: The global minimum NetCon delay is %g, so turned off the "
1396  "cvode.queue_mode\n",
1397  od);
1398  Printf(" use_self_queue option. The interprocessor minimum NetCon delay is %g\n",
1399  mindelay);
1400  }
1401  }
1402  errno = 0;
1403  return mindelay;
1404 #else
1405  mindelay_ = mindelay;
1407  return mindelay;
1408 #endif // NRNMPI
1409 }
1410 
1411 double BBS::netpar_mindelay(double maxdelay) {
1412 #if NRNMPI
1414 #endif
1415  double tt = set_mindelay(maxdelay);
1416  return tt;
1417 }
1418 
1419 void BBS::netpar_spanning_statistics(int* nsend, int* nsendmax, int* nrecv, int* nrecv_useful) {
1420 #if NRNMPI
1421  *nsend = nsend_;
1422  *nsendmax = nsendmax_;
1423  *nrecv = nrecv_;
1424  *nrecv_useful = nrecv_useful_;
1425 #endif
1426 }
1427 
1428 // unfortunately, ivocvect.h conflicts with STL
1430 #if NRNMPI
1431  IvocVect* h = max_histogram_;
1432  if (max_histogram_) {
1433  max_histogram_ = NULL;
1434  }
1435  if (mh) {
1436  max_histogram_ = mh;
1437  }
1438  return h;
1439 #else
1440  return nullptr;
1441 #endif
1442 }
1443 
1444 /* 08-Nov-2010
1445 The workhorse for spike exchange on up to 10K machines is MPI_Allgather
1446 but as the number of machines becomes far greater than the fanout per
1447 cell we have been exploring a class of exchange methods called multisend
1448 where the spikes only go to those machines that need them and there is
1449 overlap between communication and computation. The numer of variants of
1450 multisend has grown so that some method selection function is needed
1451 that makes sense.
1452 
1453 The situation that needs to be captured by xchng_meth is
1454 
1455 0: Allgather
1456 1: multisend implemented as MPI_ISend
1457 
1458 n_multisend_interval 1 or 2 per minimum interprocessor NetCon delay
1459  that concept valid for all methods
1460 
1461 Note that Allgather allows spike compression and an allgather spike buffer
1462  with size chosen at setup time. All methods allow bin queueing.
1463 
1464 The multisend methods should allow two phase multisend.
1465 
1466 Note that, in principle, MPI_ISend allows the source to send the index
1467  of the target PreSyn to avoid a hash table lookup (even with a two phase
1468  variant)
1469 
1470 Not all variation are useful. e.g. it is pointless to combine Allgather and
1471 n_multisend_interval=2.
1472 The whole point is to make the
1473 spike transfer initiation as lowcost as possible since that is what causes
1474 most load imbalance. I.e. since 10K more spikes arrive than are sent, spikes
1475 received per processor per interval are much more statistically
1476 balanced than spikes sent per processor per interval. And presently
1477 DCMF multisend injects 10000 messages per spike into the network which
1478 is quite expensive.
1479 
1480 See case 8 of nrn_multisend_receive_time for the xchng_meth properties
1481 */
1482 
1483 int nrnmpi_spike_compress(int nspike, bool gid_compress, int xchng_meth) {
1484 #if NRNMPI
1485  if (nrnmpi_numprocs < 2) {
1486  return 0;
1487  }
1488  if (nspike >= 0) { // otherwise don't set any multisend properties
1489  n_multisend_interval = (xchng_meth & 4) ? 2 : 1;
1490  use_multisend_ = (xchng_meth & 1) == 1;
1491  use_phase2_ = (xchng_meth & 8) ? 1 : 0;
1492  if (use_multisend_) {
1493  assert(NRNMPI);
1494  }
1496  }
1497  if (nspike >= 0) {
1498  ag_send_nspike_ = 0;
1499  if (spfixout_) {
1500  free(spfixout_);
1501  spfixout_ = 0;
1502  }
1503  if (spfixin_) {
1504  free(spfixin_);
1505  spfixin_ = 0;
1506  }
1507  if (spfixin_ovfl_) {
1508  free(spfixin_ovfl_);
1509  spfixin_ovfl_ = 0;
1510  }
1511  localmaps_.clear();
1512  }
1513  if (nspike == 0) { // turn off
1514  use_compress_ = false;
1515  nrn_use_localgid_ = false;
1516  } else if (nspike > 0) { // turn on
1517  if (cvode_active_) {
1518  if (nrnmpi_myid == 0) {
1519  hoc_warning("ParallelContext.spike_compress cannot be used with cvode active", 0);
1520  }
1521  use_compress_ = false;
1522  nrn_use_localgid_ = false;
1523  return 0;
1524  }
1525  use_compress_ = true;
1526  ag_send_nspike_ = nspike;
1527  nrn_use_localgid_ = false;
1528  if (gid_compress) {
1529  // we can only do this after everything is set up
1530  mk_localgid_rep();
1531  if (!nrn_use_localgid_ && nrnmpi_myid == 0) {
1532  Printf(
1533  "Notice: gid compression did not succeed. Probably more than 255 cells on one "
1534  "cpu.\n");
1535  }
1536  }
1537  if (!nrn_use_localgid_) {
1538  localgid_size_ = sizeof(unsigned int);
1539  }
1541  spfixout_capacity_ = ag_send_size_ + 50 * (1 + localgid_size_);
1542  spfixout_ = (unsigned char*) hoc_Emalloc(spfixout_capacity_);
1543  hoc_malchk();
1544  spfixin_ = (unsigned char*) hoc_Emalloc(nrnmpi_numprocs * ag_send_size_);
1545  hoc_malchk();
1546  ovfl_capacity_ = 100;
1547  spfixin_ovfl_ = (unsigned char*) hoc_Emalloc(ovfl_capacity_ * (1 + localgid_size_));
1548  hoc_malchk();
1549  }
1550  return ag_send_nspike_;
1551 #else
1552  return 0;
1553 #endif
1554 }
1555 
1556 PreSyn* nrn_gid2outputpresyn(int gid) { // output PreSyn
1557  auto iter = gid2out_.find(gid);
1558  if (iter != gid2out_.end()) {
1559  return iter->second;
1560  }
1561  return NULL;
1562 }
1563 
1564 Object* nrn_gid2obj(int gid) {
1565  return gid2obj_(gid);
1566 }
1567 
1568 PreSyn* nrn_gid2presyn(int gid) { // output PreSyn
1569  auto iter = gid2out_.find(gid);
1570  nrn_assert(iter != gid2out_.end());
1571  return iter->second;
1572 }
1573 
1574 void nrn_gidout_iter(PFIO callback) {
1575  for (const auto& iter: gid2out_) {
1576  PreSyn* ps = iter.second;
1577  if (ps) {
1578  int gid = ps->gid_;
1579  Object* c = gid2obj_(gid);
1580  (*callback)(gid, c);
1581  }
1582  }
1583 }
1584 
1585 #include "nrncore_write.h"
1586 extern int* nrn_prop_param_size_;
1587 extern short* nrn_is_artificial_;
1588 static int weightcnt(NetCon* nc) {
1589  return nc->cnt_;
1590  // return nc->target_ ? pnt_receive_size[nc->target_->prop->_type]: 1;
1591 }
1592 
1594  size_t ntot, nin, nout, nnet, nweight;
1595  ntot = nin = nout = nnet = nweight = 0;
1596  size_t npnt = 0;
1597  if (0 && nrnmpi_myid == 0) {
1598  printf("size Presyn=%ld NetCon=%ld Point_process=%ld Prop=%ld\n",
1599  sizeof(PreSyn),
1600  sizeof(NetCon),
1601  sizeof(Point_process),
1602  sizeof(Prop));
1603  }
1604  for (const auto& iter: gid2out_) {
1605  PreSyn* ps = iter.second;
1606  if (ps) {
1607  nout += 1;
1608  int n = ps->dil_.size();
1609  nnet += n;
1610  for (auto nc: ps->dil_) {
1611  nweight += weightcnt(nc);
1612  if (nc->target_) {
1613  npnt += 1;
1614  }
1615  }
1616  }
1617  }
1618  // printf("output Presyn npnt = %ld nweight = %ld\n", npnt, nweight);
1619  for (const auto& iter: gid2in_) {
1620  PreSyn* ps = iter.second;
1621  if (ps) {
1622  nin += 1;
1623  int n = ps->dil_.size();
1624  nnet += n;
1625  for (auto nc: ps->dil_) {
1626  nweight += weightcnt(nc);
1627  if (nc->target_) {
1628  npnt += 1;
1629  }
1630  }
1631  }
1632  }
1633  // printf("after input Presyn total npnt = %ld nweight = %ld\n", npnt, nweight);
1634  ntot = (nin + nout) * sizeof(PreSyn) + nnet * sizeof(NetCon) + nweight * sizeof(double);
1635  // printf("%d rank output Presyn %ld input Presyn %ld NetCon %ld bytes %ld\n",
1636  // nrnmpi_myid, nout, nin, nnet, ntot);
1637  return ntot;
1638 }
static void nrnmpi_int_allgather(int *s, int *r, int n)
static void nrnmpi_barrier()
static int nrnmpi_int_allmax(int x)
void setup_topology(void)
Definition: cabcode.cpp:1635
int tree_changed
Definition: cabcode.cpp:51
Object ** gid2cell(int)
Definition: netpar.cpp:1167
void spike_record(int, IvocVect *, IvocVect *)
Definition: netpar.cpp:1113
Object ** gid_connect(int)
Definition: netpar.cpp:1193
void set_gid2node(int, int)
Definition: netpar.cpp:947
void netpar_solve(double)
Definition: netpar.cpp:1251
Object ** gid2obj(int)
Definition: netpar.cpp:1163
IvocVect * netpar_max_histogram(IvocVect *)
Definition: netpar.cpp:1429
int gid_exists(int)
Definition: netpar.cpp:1053
void cell()
Definition: netpar.cpp:1070
void netpar_spanning_statistics(int *, int *, int *, int *)
Definition: netpar.cpp:1419
BBSImpl * impl_
Definition: bbs.h:73
void outputcell(int)
Definition: netpar.cpp:1104
double netpar_mindelay(double maxdelay)
Definition: netpar.cpp:1411
double threshold()
Definition: netpar.cpp:1057
double send_time_
Definition: bbsimpl.h:58
double integ_time_
Definition: bbsimpl.h:57
double wait_time_
Definition: bbsimpl.h:56
size_t size() const
Definition: ivocvect.h:42
int buffer_size()
Definition: ivocvect.cpp:1367
auto end() const -> std::vector< double >::const_iterator
Definition: ivocvect.h:68
std::vector< double > & vec()
Definition: ivocvect.h:30
Object * obj_
Definition: ivocvect.h:101
Definition: netcon.h:87
void replace_src(PreSyn *)
Definition: netcvode.cpp:4768
bool active_
Definition: netcon.h:119
int cnt_
Definition: netcon.h:118
double delay_
Definition: netcon.h:113
Point_process * target_
Definition: netcon.h:115
Object * obj_
Definition: netcon.h:117
PreSyn * src_
Definition: netcon.h:114
std::vector< PreSyn * > * psl_
Definition: netcvode.h:246
TQItem * bin_event(double tdeliver, DiscreteEvent *, NrnThread *)
Definition: netcvode.cpp:2498
void deliver_events(double til, NrnThread *)
Definition: netcvode.cpp:2835
void localstep(bool)
Definition: netcvode.cpp:1176
void psl_append(PreSyn *)
Definition: netcvode.cpp:4624
TQItem * event(double tdeliver, DiscreteEvent *, NrnThread *)
Definition: netcvode.cpp:2522
virtual ~NetParEvent()
Definition: netpar.cpp:235
virtual void savestate_write(FILE *)
Definition: netpar.cpp:303
virtual void pgvts_deliver(double t, NetCvode *)
Definition: netpar.cpp:276
virtual DiscreteEvent * savestate_save()
Definition: netpar.cpp:285
static DiscreteEvent * savestate_read(FILE *)
Definition: netpar.cpp:292
int ithread_
Definition: netcon.h:413
double wx_
Definition: netcon.h:412
virtual void send(double, NetCvode *, NrnThread *)
Definition: netpar.cpp:236
virtual void savestate_restore(double deliverytime, NetCvode *)
Definition: netpar.cpp:308
virtual void deliver(double, NetCvode *, NrnThread *)
Definition: netpar.cpp:241
double ws_
Definition: netcon.h:412
virtual void pr(const char *, double t, NetCvode *)
Definition: netpar.cpp:281
Definition: netcon.h:258
int output_index_
Definition: netcon.h:308
virtual void send(double sendtime, NetCvode *, NrnThread *)
Definition: netcvode.cpp:3016
double mindelay()
Definition: netcvode.cpp:2825
Section * ssrc_
Definition: netcon.h:299
PreSyn(neuron::container::data_handle< double > src, Object *osrc, Section *ssrc=nullptr)
Definition: netcvode.cpp:4930
void record(IvocVect *, IvocVect *idvec=nullptr, int rec_id=0)
Definition: netcvode.cpp:5088
int gid_
Definition: netcon.h:309
Object * osrc_
Definition: netcon.h:298
double threshold_
Definition: netcon.h:295
NetConPList dil_
Definition: netcon.h:294
#define nrn_spikebuf_size
Definition: nrnmpi.h:18
#define sec
Definition: md1redef.h:20
#define i
Definition: md1redef.h:19
ms
Definition: extargs.h:1
double chkarg(int, double low, double high)
Definition: code2.cpp:626
static RNG::key_type k
Definition: nrnran123.cpp:9
char buf[512]
Definition: init.cpp:13
void hoc_execerr_ext(const char *fmt,...)
printf style specification of hoc_execerror message.
Definition: fileio.cpp:828
Object ** hoc_temp_objvar(Symbol *symtemp, void *v)
Definition: hoc_oop.cpp:484
void check_obj_type(Object *obj, const char *type_name)
Definition: hoc_oop.cpp:2098
char * hoc_object_name(Object *ob)
Definition: hoc_oop.cpp:73
Symbol * hoc_lookup(const char *)
Definition: symbol.cpp:59
static int c
Definition: hoc.cpp:169
#define assert(ex)
Definition: hocassrt.h:24
#define getarg
Definition: hocdec.h:17
Point_process * ob2pntproc(Object *ob)
Definition: hocmech.cpp:99
Object ** hoc_objgetarg(int)
Definition: code.cpp:1614
static double map(void *v)
Definition: mlinedit.cpp:43
floor
Definition: extdef.h:4
printf
Definition: extdef.h:5
static double nrnmpi_wtime()
Definition: multisplit.cpp:48
static int nrnmpi_use
Definition: multisplit.cpp:41
phase
Reading phase number.
Definition: nrn_setup.hpp:53
NrnThread * nrn_threads
Definition: multicore.cpp:56
double * vector_vec(IvocVect *v)
Definition: ivocvect.cpp:19
void hoc_malchk(void)
Definition: nrnoc_aux.cpp:83
int nrn_nthread
Definition: multicore.cpp:55
void nrn2ncs_outputevent(int netcon_output_index, double firetime)
bool use_multisend_
Definition: multisend.cpp:53
bool cvode_active_
Definition: netcvode.cpp:36
int v_structure_change
Definition: nrnoc_aux.cpp:20
void hoc_execerror(const char *s1, const char *s2)
Definition: nrnoc_aux.cpp:39
void nrn_outputevent(unsigned char, double)
void nrn_multisend_receive(NrnThread *)
bool nrn_use_localgid_
int vector_capacity(IvocVect *v)
Definition: ivocvect.cpp:16
void hoc_warning(const char *s1, const char *s2)
Definition: nrnoc_aux.cpp:44
void * hoc_Emalloc(size_t)
Definition: nrnoc_aux.cpp:80
int diam_changed
Definition: nrnoc_aux.cpp:21
if(ncell==0)
Definition: cellorder.cpp:785
#define NetParEventType
Definition: netcon.hpp:27
void v_setup_vectors()
Definition: treeset.cpp:1596
int is_point_process(Object *)
Definition: point.cpp:370
void recalc_diam(void)
Definition: treeset.cpp:923
#define nrn_assert(x)
assert()-like macro, independent of NDEBUG status
Definition: nrn_assert.h:33
#define tstopunset
Definition: nrnconf.h:45
int const size_t const size_t n
Definition: nrngsl.h:10
size_t j
void nrn_multisend_setup()
Definition: multisend.cpp:617
TBUF void nrn_multisend_send(PreSyn *ps, double t)
Definition: multisend.cpp:541
void nrn_multisend_cleanup_presyn(PreSyn *ps)
Definition: multisend.cpp:553
wt1_
Definition: multisend.cpp:531
static void nrn_multisend_init()
Definition: multisend.cpp:375
static void nrn_multisend_cleanup()
Definition: multisend.cpp:566
wt_
Definition: multisend.cpp:532
static int use_phase2_
Definition: multisend.cpp:132
void nrn_partrans_clear()
Definition: partrans.cpp:768
static IvocVect * all_spikegidvec
Definition: netpar.cpp:43
double dt
Definition: netpar.cpp:62
static int n_npe_
Definition: netpar.cpp:210
Gid2PreSyn & nrn_gid2out()
Definition: netpar.cpp:218
short * nrn_is_artificial_
Definition: init.cpp:214
static int active_
Definition: netpar.cpp:204
void nrn_gidout_iter(PFIO)
Definition: netpar.cpp:1574
static int * nin_
Definition: netpar.cpp:55
static double t_exchange_
Definition: netpar.cpp:44
static NRNMPI_Spike * spikein_
Definition: netpar.cpp:57
static int icapacity_
Definition: netpar.cpp:58
static int timeout_
Definition: netpar.cpp:1243
static Gid2PreSyn gid2out_
Definition: netpar.cpp:40
double nrn_multisend_receive_time(int)
Definition: netpar.cpp:92
static int localgid_size_
Definition: netpar.cpp:46
static Symbol * netcon_sym_
Definition: netpar.cpp:39
static int weightcnt(NetCon *nc)
Definition: netpar.cpp:1588
static Object * gid2obj_(int gid)
Definition: netpar.cpp:1150
double nrnmpi_step_wait_
Definition: ocbbs.cpp:34
int nrnthread_all_spike_vectors_return(std::vector< double > &spiketvec, std::vector< int > &spikegidvec)
NEURON callback used from CORENEURON to transfer all spike vectors after simulation.
Definition: netpar.cpp:1320
void ncs2nrn_integrate(double tstop)
Definition: netcvode.cpp:3700
void nrn_cleanup_presyn(PreSyn *)
Definition: netpar.cpp:970
static Gid2PreSyn gid2in_
Definition: netpar.cpp:41
static int n_multisend_interval
Definition: netpar.cpp:32
static NRNMPI_Spike * spikeout_
Definition: netpar.cpp:56
static int ovfl_capacity_
Definition: netpar.cpp:49
static int ovfl_
Definition: netpar.cpp:50
void nrn_spike_exchange_init()
Definition: netpar.cpp:463
void nrn_spike_exchange(NrnThread *)
Definition: netpar.cpp:552
static double min_interprocessor_delay_
Definition: netpar.cpp:206
double nrn_usable_mindelay()
Definition: netpar.cpp:212
PreSyn * nrn_gid2outputpresyn(int gid)
Definition: netpar.cpp:1556
static unsigned char * spfixout_
Definition: netpar.cpp:51
void nrn_fake_fire(int gid, double spiketime, int fake_out)
Definition: netpar.cpp:900
static double mindelay_
Definition: netpar.cpp:207
int nrn_set_timeout(int)
Definition: netpar.cpp:1244
PreSyn * nrn_gid2presyn(int)
Definition: netpar.cpp:1568
void nrnmpi_multisplit_clear()
Definition: multisplit.cpp:499
std::unordered_map< int, PreSyn * > Gid2PreSyn
Definition: netpar.cpp:21
int * nrn_prop_param_size_
Definition: init.cpp:162
double t
Definition: cvodeobj.cpp:57
static int nrn_need_npe()
Definition: netpar.cpp:418
static double set_mindelay(double maxdelay)
Definition: netpar.cpp:1352
static NetParEvent * npe_
Definition: netpar.cpp:209
int nrn_gid_exists(int)
Definition: netpar.cpp:1039
static double dt1_
Definition: netpar.cpp:45
static int nout_
Definition: netpar.cpp:54
#define TBUF
Definition: netpar.cpp:460
static int gid_donot_remove
Definition: netpar.cpp:968
void(* PFIO)(int, Object *)
Definition: netpar.cpp:74
static unsigned char * spfixin_
Definition: netpar.cpp:52
static int ag_send_nspike_
Definition: netpar.cpp:48
static double last_maxstep_arg_
Definition: netpar.cpp:208
static double usable_mindelay_
Definition: netpar.cpp:205
void nrnmpi_gid_clear(int)
Definition: netpar.cpp:988
#define MD
Definition: netpar.cpp:16
int nrnmpi_spike_compress(int nspike, bool gid_compress, int xchng_meth)
Definition: netpar.cpp:1483
static void alloc_space()
Definition: netpar.cpp:925
Object * nrn_sec2cell(Section *)
Definition: cabcode.cpp:252
Symbol * nrn_netcon_sym()
Definition: netpar.cpp:215
static unsigned char * spfixin_ovfl_
Definition: netpar.cpp:53
NetCvode * net_cvode_instance
Definition: cvodestb.cpp:26
static IvocVect * all_spiketvec
Definition: netpar.cpp:42
static void calc_actual_mindelay()
Definition: netpar.cpp:444
int nrn_use_selfqueue_
Definition: netcvode.cpp:77
size_t nrncore_netpar_bytes()
Definition: netpar.cpp:1593
static int ag_send_size_
Definition: netpar.cpp:47
Object * nrn_gid2obj(int)
Definition: netpar.cpp:1564
void nrn_pending_selfqueue(double, NrnThread *)
Definition: netcvode.cpp:3759
int ifarg(int)
Definition: code.cpp:1607
#define NRNMPI
Definition: nrnmpiuse.h:14
void nrnmusic_runtime_phase()
Definition: nrnmusic.cpp:231
int nrnmusic
#define MUTCONSTRUCT(mkmut)
Definition: nrnmutdec.h:33
#define MUTDESTRUCT
Definition: nrnmutdec.h:34
#define MUTDEC
Definition: nrnmutdec.h:31
#define MUTLOCK
Definition: nrnmutdec.h:35
#define MUTUNLOCK
Definition: nrnmutdec.h:36
void * hoc_Erealloc(void *ptr, std::size_t size)
Definition: memory.cpp:41
int nrnmpi_myid
#define NULL
Definition: spdefs.h:105
Object ** hoc_temp_objptr(Object *)
Definition: code.cpp:151
double spiketime
Definition: nrnmpi.h:18
int gid
Definition: nrnmpi.h:17
Represent main neuron object computed by single thread.
Definition: multicore.h:58
int _stop_stepping
Definition: multicore.h:67
double _t
Definition: multicore.h:59
Definition: hocdec.h:173
void * this_pointer
Definition: hocdec.h:178
int refcount
Definition: hocdec.h:174
cTemplate * ctemplate
Definition: hocdec.h:180
union Object::@47 u
A point process is computed just like regular mechanisms.
Definition: section_fwd.hpp:77
Section * sec
Definition: section_fwd.hpp:78
Definition: section.h:231
Definition: model.h:47
union Symbol::@28 u
cTemplate * ctemplate
Definition: hocdec.h:126
int Printf(const char *fmt, Args... args)
Definition: logger.hpp:18