1 #include <../../nrnconf.h>
10 #if NRNMPI_DYNAMICLOAD
11 #include <nrnmpi_dynam.h>
18 #include "nrnmpidec.h"
26 #define nrn_mpi_assert(arg) nrn_assert(arg == MPI_SUCCESS)
28 extern void nrnbbs_context_wait();
35 static void pgvts_op(
double* in,
double* inout,
int* len, MPI_Datatype* dptr);
36 static MPI_Op mpi_pgvts_op;
38 static void make_spike_type() {
41 MPI_Aint displacements[2];
42 MPI_Aint addresses[3];
43 MPI_Datatype typelist[2];
45 typelist[0] = MPI_INT;
46 typelist[1] = MPI_DOUBLE;
48 block_lengths[0] = block_lengths[1] = 1;
50 MPI_Get_address(&
s, &addresses[0]);
51 MPI_Get_address(&(
s.gid), &addresses[1]);
52 MPI_Get_address(&(
s.spiketime), &addresses[2]);
54 displacements[0] = addresses[1] - addresses[0];
55 displacements[1] = addresses[2] - addresses[0];
57 MPI_Type_create_struct(2, block_lengths, displacements, typelist, &
spike_type);
60 MPI_Op_create((MPI_User_function*) pgvts_op, 1, &mpi_pgvts_op);
67 #if nrn_spikebuf_size > 0
69 static MPI_Datatype spikebuf_type;
71 static void make_spikebuf_type(
int*
nout_) {
74 MPI_Aint displacements[3];
75 MPI_Aint addresses[4];
76 MPI_Datatype typelist[3];
78 typelist[0] = MPI_INT;
79 typelist[1] = MPI_INT;
80 typelist[2] = MPI_DOUBLE;
86 MPI_Get_address(&
s, &addresses[0]);
87 MPI_Get_address(&(
s.nspike), &addresses[1]);
88 MPI_Get_address(&(
s.gid[0]), &addresses[2]);
89 MPI_Get_address(&(
s.spiketime[0]), &addresses[3]);
91 displacements[0] = addresses[1] - addresses[0];
92 displacements[1] = addresses[2] - addresses[0];
93 displacements[2] = addresses[3] - addresses[0];
95 MPI_Type_create_struct(3, block_lengths, displacements, typelist, &spikebuf_type);
96 MPI_Type_commit(&spikebuf_type);
100 int nrnmpi_spike_exchange(
int* ovfl,
112 #if nrn_spikebuf_size > 0
113 make_spikebuf_type(
nout_);
116 nrnbbs_context_wait();
117 #if nrn_spikebuf_size == 0
120 for (
i = 1;
i <
np; ++
i) {
135 MPI_Allgather(spbufout_, 1, spikebuf_type, spbufin_, 1, spikebuf_type,
nrnmpi_comm);
137 n = spbufin_[0].nspike;
144 for (
i = 1;
i <
np; ++
i) {
146 int n1 = spbufin_[
i].nspike;
188 int nrnmpi_spike_exchange_compressed(
int localgid_size,
193 unsigned char* spfixout,
194 unsigned char* spfixin,
195 unsigned char** spfixin_ovfl,
197 int i, novfl,
n, ntot, idx, bs, bstot;
208 nrnbbs_context_wait();
210 MPI_Allgather(spfixout, ag_send_size, MPI_BYTE, spfixin, ag_send_size, MPI_BYTE,
nrnmpi_comm);
214 for (
i = 0;
i <
np; ++
i) {
216 idx =
i * ag_send_size;
217 n = spfixin[idx++] * 256;
221 if (
n > ag_send_nspike) {
222 bs = 2 +
n * (1 + localgid_size) - ag_send_size;
225 novfl +=
n - ag_send_nspike;
231 if (*ovfl_capacity < novfl) {
232 *ovfl_capacity = novfl + 10;
234 *spfixin_ovfl = (
unsigned char*)
hoc_Emalloc(*ovfl_capacity * (1 + localgid_size) *
235 sizeof(
unsigned char));
245 MPI_Allgatherv(spfixout + ag_send_size,
258 double nrnmpi_mindelay(
double m) {
263 nrnbbs_context_wait();
273 nrnbbs_context_wait();
278 #define ALLTOALLV_SPARSE_TAG 101980
282 static int MPI_Alltoallv_sparse(
void* sendbuf,
285 MPI_Datatype sendtype,
289 MPI_Datatype recvtype,
293 nrn_mpi_assert(MPI_Comm_rank(comm, &myrank));
294 nrn_mpi_assert(MPI_Comm_size(comm, &nranks));
297 for (rankp = 0; nranks > (1 << rankp); rankp++)
301 ptrdiff_t send_elsize;
302 ptrdiff_t recv_elsize;
304 nrn_mpi_assert(MPI_Type_get_extent(sendtype, &lb, &send_elsize));
305 nrn_mpi_assert(MPI_Type_get_extent(recvtype, &lb, &recv_elsize));
307 MPI_Request* requests = (MPI_Request*)
hoc_Emalloc(nranks * 2 *
sizeof(MPI_Request));
314 for (ngrp = 0; ngrp < (1 << rankp); ngrp++) {
315 int target = myrank ^ ngrp;
317 if (target >= nranks)
319 if (recvcnts[target] == 0)
321 nrn_mpi_assert(MPI_Irecv((
static_cast<char*
>(recvbuf)) + recv_elsize * rdispls[target],
325 ALLTOALLV_SPARSE_TAG,
327 &requests[n_requests++]));
330 nrn_mpi_assert(MPI_Barrier(comm));
332 for (ngrp = 0; ngrp < (1 << rankp); ngrp++) {
333 int target = myrank ^ ngrp;
334 if (target >= nranks)
336 if (sendcnts[target] == 0)
338 nrn_mpi_assert(MPI_Isend((
static_cast<char*
>(sendbuf)) + send_elsize * sdispls[target],
342 ALLTOALLV_SPARSE_TAG,
344 &requests[n_requests++]));
347 nrn_mpi_assert(MPI_Waitall(n_requests, requests, MPI_STATUSES_IGNORE));
350 nrn_mpi_assert(MPI_Barrier(comm));
356 extern void nrnmpi_dbl_alltoallv_sparse(
double*
s,
362 MPI_Alltoallv_sparse(
s, scnt, sdispl, MPI_DOUBLE, r, rcnt, rdispl, MPI_DOUBLE,
nrnmpi_comm);
364 extern void nrnmpi_int_alltoallv_sparse(
int*
s,
370 MPI_Alltoallv_sparse(
s, scnt, sdispl, MPI_INT, r, rcnt, rdispl, MPI_INT,
nrnmpi_comm);
373 extern void nrnmpi_long_alltoallv_sparse(int64_t*
s,
379 MPI_Alltoallv_sparse(
s, scnt, sdispl, MPI_INT64_T, r, rcnt, rdispl, MPI_INT64_T,
nrnmpi_comm);
383 extern void nrnmpi_int_gather(
int*
s,
int* r,
int cnt,
int root) {
387 extern void nrnmpi_int_gatherv(
int*
s,
int scnt,
int* r,
int* rcnt,
int* rdispl,
int root) {
391 extern void nrnmpi_char_gatherv(
char*
s,
int scnt,
char* r,
int* rcnt,
int* rdispl,
int root) {
392 MPI_Gatherv(
s, scnt, MPI_CHAR, r, rcnt, rdispl, MPI_CHAR,
root,
nrnmpi_comm);
395 extern void nrnmpi_int_scatter(
int*
s,
int* r,
int cnt,
int root) {
399 extern void nrnmpi_char_scatterv(
char*
s,
int* scnt,
int* sdispl,
char* r,
int rcnt,
int root) {
400 MPI_Scatterv(
s, scnt, sdispl, MPI_CHAR, r, rcnt, MPI_CHAR,
root,
nrnmpi_comm);
403 extern void nrnmpi_int_alltoall(
int*
s,
int* r,
int n) {
413 MPI_Alltoallv(
s, scnt, sdispl, MPI_INT, r, rcnt, rdispl, MPI_INT,
nrnmpi_comm);
416 extern void nrnmpi_long_alltoallv(int64_t*
s,
422 MPI_Alltoallv(
s, scnt, sdispl, MPI_INT64_T, r, rcnt, rdispl, MPI_INT64_T,
nrnmpi_comm);
431 MPI_Alltoallv(
s, scnt, sdispl, MPI_DOUBLE, r, rcnt, rdispl, MPI_DOUBLE,
nrnmpi_comm);
434 extern void nrnmpi_char_alltoallv(
char*
s,
440 MPI_Alltoallv(
s, scnt, sdispl, MPI_CHAR, r, rcnt, rdispl, MPI_CHAR,
nrnmpi_comm);
449 void nrnmpi_int_allgather_inplace(
int* srcdest,
int n) {
450 MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, srcdest,
n, MPI_INT,
nrnmpi_comm);
457 void nrnmpi_int_allgatherv_inplace(
int* srcdest,
int*
n,
int* dspl) {
458 MPI_Allgatherv(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, srcdest,
n, dspl, MPI_INT,
nrnmpi_comm);
461 void nrnmpi_char_allgatherv(
char*
s,
char* r,
int*
n,
int* dspl) {
465 void nrnmpi_long_allgatherv(int64_t*
s, int64_t* r,
int*
n,
int* dspl) {
469 void nrnmpi_long_allgatherv_inplace(
long* srcdest,
int*
n,
int* dspl) {
470 MPI_Allgatherv(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, srcdest,
n, dspl, MPI_LONG,
nrnmpi_comm);
477 void nrnmpi_dbl_allgatherv_inplace(
double* srcdest,
int*
n,
int* dspl) {
478 MPI_Allgatherv(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, srcdest,
n, dspl, MPI_DOUBLE,
nrnmpi_comm);
493 void nrnmpi_str_broadcast_world(std::string& str,
int root) {
494 assert(str.size() <= std::numeric_limits<int>::max());
505 int nrnmpi_int_sum_reduce(
int in) {
511 void nrnmpi_assert_opstep(
int opstep,
double t) {
517 buf[0] = (double) opstep;
520 if (opstep != (
int)
buf[0] ||
t !=
buf[1]) {
527 double nrnmpi_dbl_allmin(
double x) {
536 static void pgvts_op(
double* in,
double* inout,
int* len, MPI_Datatype* dptr) {
538 assert(*dptr == MPI_DOUBLE);
540 if (in[0] < inout[0]) {
543 }
else if (in[0] == inout[0]) {
545 if (in[1] < inout[1]) {
548 }
else if (in[1] == inout[1]) {
550 if (in[2] < inout[2]) {
553 }
else if (in[2] == inout[2]) {
555 if (in[3] < inout[3]) {
563 for (
i = 0;
i < 4; ++
i) {
569 int nrnmpi_pgvts_least(
double*
t,
int* op,
int*
init) {
571 double ibuf[4], obuf[4];
573 ibuf[1] = (double) (*op);
574 ibuf[2] = (double) (*
init);
576 for (
i = 0;
i < 4; ++
i) {
579 MPI_Allreduce(ibuf, obuf, 4, MPI_DOUBLE, mpi_pgvts_op,
nrnmpi_comm);
582 assert((
int) obuf[1] <= *op);
583 if ((
int) obuf[1] == *op) {
585 if ((
int) obuf[2] == *
init) {
592 *
init = (int) obuf[2];
604 void nrnmpi_recv_doubles(
double* pd,
int cnt,
int src,
int tag) {
610 MPI_Irecv(pd,
cnt, MPI_DOUBLE, src, tag,
nrnmpi_comm, (MPI_Request*) request);
615 MPI_Wait((MPI_Request*) request, &
status);
628 }
else if (
type == 2) {
635 double nrnmpi_dbl_allreduce(
double x,
int type) {
644 extern "C" void nrnmpi_dbl_allreduce_vec(
double* src,
double* dest,
int cnt,
int type) {
647 for (
int i = 0;
i <
cnt; ++
i) {
659 for (
int i = 0;
i <
cnt; ++
i) {
668 void nrnmpi_long_allreduce_vec(
long* src,
long* dest,
int cnt,
int type) {
671 for (
int i = 0;
i <
cnt; ++
i) {
680 void nrnmpi_dbl_allgather(
double*
s,
double* r,
int n) {
695 for (
i = 0;
i <
n; ++
i) {
696 MPI_Isend(spk, 1,
spike_type, hosts[
i], 1, bgp_comm, &r);
697 MPI_Request_free(&r);
704 MPI_Iprobe(MPI_ANY_SOURCE, 1, bgp_comm, &flag, &
status);
713 tcnts[0] = nsend - nrecv;
714 MPI_Allreduce(tcnts, tcnts + 1, 1, MPI_INT, MPI_SUM, bgp_comm);
static void nrnmpi_dbl_alltoallv(const double *s, const int *scnt, const int *sdispl, double *r, int *rcnt, int *rdispl)
static void nrnmpi_dbl_allgatherv(double *s, double *r, int *n, int *dspl)
static void nrnmpi_int_allgather(int *s, int *r, int n)
static void nrnmpi_int_alltoallv(const int *s, const int *scnt, const int *sdispl, int *r, int *rcnt, int *rdispl)
static void nrnmpi_barrier()
static void nrnmpi_int_allgatherv(int *s, int *r, int *n, int *dspl)
static int nrnmpi_int_allmax(int x)
#define nrn_spikebuf_size
static void nrnmpi_postrecv_doubles(double *, int, int, int, void **)
static void nrnmpi_wait(void **)
static void nrnmpi_send_doubles(double *, int, int, int)
void nrnmpi_spike_initialize()
void hoc_execerror(const char *s1, const char *s2)
static MPI_Op type2OP(int type)
void * hoc_Emalloc(size_t)
static MPI_Datatype spike_type
int const size_t const size_t n
void nrnmpi_multisend_comm()
int nrnmpi_multisend_single_advance(NRNMPI_Spike *)
void nrnmpi_multisend_multisend(NRNMPI_Spike *, int, int *)
int nrnmpi_multisend_conserve(int nsend, int nrecv)
static NRNMPI_Spike * spikein_
static NRNMPI_Spike * spikeout_
MPI_Comm nrnmpi_world_comm
static void nrnmpi_dbl_broadcast(double *, int, int)
static void nrnmpi_char_broadcast(char *, int, int)
static void nrnmpi_int_broadcast(int *, int, int)