NEURON
output_spikes.cpp
Go to the documentation of this file.
1 /*
2 # =============================================================================
3 # Copyright (c) 2016 - 2021 Blue Brain Project/EPFL
4 #
5 # See top-level LICENSE file for details.
6 # =============================================================================.
7 */
8 
9 #include <iostream>
10 #include <sstream>
11 #include <cstring>
12 #include <stdexcept> // std::lenght_error
13 #include <vector>
14 #include <algorithm>
15 #include <numeric>
16 #include <limits>
17 
18 #include "coreneuron/nrnconf.h"
21 #include "coreneuron/mpi/nrnmpi.h"
27 #ifdef ENABLE_SONATA_REPORTS
28 #include "bbp/sonata/reports.h"
29 #endif // ENABLE_SONATA_REPORTS
30 
31 /**
32  * @brief Return all spike vectors to NEURON
33  *
34  * @param spiketvec - vector of spikes at the end of CORENEURON simulation
35  * @param spikegidvec - vector of gids at the end of CORENEURON simulation
36  * @return true if we are in embedded_run and NEURON has successfully retrieved the vectors
37  */
38 static bool all_spikes_return(std::vector<double>& spiketvec, std::vector<int>& spikegidvec) {
40  (*nrn2core_all_spike_vectors_return_)(spiketvec, spikegidvec);
41 }
42 
43 namespace coreneuron {
44 /// --> Coreneuron as SpikeBuffer class
45 std::vector<double> spikevec_time;
46 std::vector<int> spikevec_gid;
47 
48 static OMP_Mutex mut;
49 
50 void mk_spikevec_buffer(int sz) {
51  try {
52  spikevec_time.reserve(sz);
53  spikevec_gid.reserve(sz);
54  } catch (const std::length_error& le) {
55  std::cerr << "Lenght error" << le.what() << std::endl;
56  }
57 }
58 
59 void spikevec_lock() {
60  mut.lock();
61 }
62 
64  mut.unlock();
65 }
66 
67 static void local_spikevec_sort(std::vector<double>& isvect,
68  std::vector<int>& isvecg,
69  std::vector<double>& osvect,
70  std::vector<int>& osvecg) {
71  osvect.resize(isvect.size());
72  osvecg.resize(isvecg.size());
73  // first build a permutation vector
74  std::vector<std::size_t> perm(isvect.size());
75  std::iota(perm.begin(), perm.end(), 0);
76  // sort by time then gid
77  std::sort(perm.begin(), perm.end(), [&](std::size_t i, std::size_t j) {
78  return isvect[i] < isvect[j] || (isvect[i] == isvect[j] && isvecg[i] < isvecg[j]);
79  });
80 
81  // now apply permutation to time and gid output vectors
82  std::transform(perm.begin(), perm.end(), osvect.begin(), [&](std::size_t i) {
83  return isvect[i];
84  });
85  std::transform(perm.begin(), perm.end(), osvecg.begin(), [&](std::size_t i) {
86  return isvecg[i];
87  });
88 }
89 
90 #if NRNMPI
91 
92 static void sort_spikes(std::vector<double>& spikevec_time, std::vector<int>& spikevec_gid) {
93  double lmin_time = std::numeric_limits<double>::max();
94  double lmax_time = std::numeric_limits<double>::min();
95  if (!spikevec_time.empty()) {
96  lmin_time = *(std::min_element(spikevec_time.begin(), spikevec_time.end()));
97  lmax_time = *(std::max_element(spikevec_time.begin(), spikevec_time.end()));
98  }
99  double min_time = nrnmpi_dbl_allmin(lmin_time);
100  double max_time = nrnmpi_dbl_allmax(lmax_time);
101 
102  // allocate send and receive counts and displacements for MPI_Alltoallv
103  std::vector<int> snd_cnts(nrnmpi_numprocs);
104  std::vector<int> rcv_cnts(nrnmpi_numprocs);
105  std::vector<int> snd_dsps(nrnmpi_numprocs);
106  std::vector<int> rcv_dsps(nrnmpi_numprocs);
107 
108  double bin_t = (max_time - min_time) / nrnmpi_numprocs;
109  bin_t = bin_t ? bin_t : 1;
110  // first find number of spikes in each time window
111  for (const auto& st: spikevec_time) {
112  int idx = (int) (st - min_time) / bin_t;
113  snd_cnts[idx]++;
114  }
115  for (int i = 1; i < nrnmpi_numprocs; i++) {
116  snd_dsps[i] = snd_dsps[i - 1] + snd_cnts[i - 1];
117  }
118 
119  // now let each rank know how many spikes they will receive
120  // and get in turn all the buffer sizes to receive
121  nrnmpi_int_alltoall(&snd_cnts[0], &rcv_cnts[0], 1);
122  for (int i = 1; i < nrnmpi_numprocs; i++) {
123  rcv_dsps[i] = rcv_dsps[i - 1] + rcv_cnts[i - 1];
124  }
125  std::size_t new_sz = 0;
126  for (const auto& r: rcv_cnts) {
127  new_sz += r;
128  }
129  // prepare new sorted vectors
130  std::vector<double> svt_buf(new_sz, 0.0);
131  std::vector<int> svg_buf(new_sz, 0);
132 
133  // now exchange data
135  &snd_cnts[0],
136  &snd_dsps[0],
137  svt_buf.data(),
138  &rcv_cnts[0],
139  &rcv_dsps[0]);
141  &snd_cnts[0],
142  &snd_dsps[0],
143  svg_buf.data(),
144  &rcv_cnts[0],
145  &rcv_dsps[0]);
146 
147  local_spikevec_sort(svt_buf, svg_buf, spikevec_time, spikevec_gid);
148 }
149 
150 #ifdef ENABLE_SONATA_REPORTS
151 /** Split spikevec_time and spikevec_gid by populations
152  * Add spike data with population name and gid offset tolibsonatareport API
153  */
154 void output_spike_populations(const SpikesInfo& spikes_info) {
155  // Write spikes with default population name and offset
156  if (spikes_info.population_info.empty()) {
157  sonata_add_spikes_population("All",
158  0,
159  spikevec_time.data(),
160  spikevec_time.size(),
161  spikevec_gid.data(),
162  spikevec_gid.size());
163  return;
164  }
165  int n_populations = spikes_info.population_info.size();
166  for (int idx = 0; idx < n_populations; idx++) {
167  const auto& curr_pop = spikes_info.population_info[idx];
168  std::string population_name = curr_pop.first;
169  int population_offset = curr_pop.second;
170  int gid_lower = population_offset;
171  int gid_upper = std::numeric_limits<int>::max();
172  if (idx != n_populations - 1) {
173  gid_upper = spikes_info.population_info[idx + 1].second - 1;
174  }
175  std::vector<double> pop_spikevec_time;
176  std::vector<int> pop_spikevec_gid;
177  for (int j = 0; j < spikevec_gid.size(); j++) {
178  if (spikevec_gid[j] >= gid_lower && spikevec_gid[j] <= gid_upper) {
179  pop_spikevec_time.push_back(spikevec_time[j]);
180  pop_spikevec_gid.push_back(spikevec_gid[j]);
181  }
182  }
183  sonata_add_spikes_population(population_name.data(),
184  population_offset,
185  pop_spikevec_time.data(),
186  pop_spikevec_time.size(),
187  pop_spikevec_gid.data(),
188  pop_spikevec_gid.size());
189  }
190 }
191 #endif // ENABLE_SONATA_REPORTS
192 
193 /** Write generated spikes to out.dat using mpi parallel i/o.
194  * \todo : MPI related code should be factored into nrnmpi.c
195  * Check spike record length which is set to 64 chars
196  */
197 static void output_spikes_parallel(const char* outpath, const SpikesInfo& spikes_info) {
198  std::stringstream ss;
199  ss << outpath << "/out.dat";
200  std::string fname = ss.str();
201 
202  // remove if file already exist
203  if (nrnmpi_myid == 0) {
204  remove(fname.c_str());
205  }
206 #ifdef ENABLE_SONATA_REPORTS
207  sonata_create_spikefile(outpath, spikes_info.file_name.data());
208  output_spike_populations(spikes_info);
209  sonata_write_spike_populations();
210  sonata_close_spikefile();
211 #endif // ENABLE_SONATA_REPORTS
212 
213  sort_spikes(spikevec_time, spikevec_gid);
214  nrnmpi_barrier();
215 
216  // each spike record in the file is time + gid (64 chars sufficient)
217  const int SPIKE_RECORD_LEN = 64;
218  size_t num_spikes = spikevec_gid.size();
219  size_t num_bytes = (sizeof(char) * num_spikes * SPIKE_RECORD_LEN);
220  char* spike_data = (char*) malloc(num_bytes);
221 
222  if (spike_data == nullptr) {
223  printf("Error while writing spikes due to memory allocation\n");
224  return;
225  }
226 
227  // empty if no spikes
228  strcpy(spike_data, "");
229 
230  // populate buffer with all spike entries
231  char spike_entry[SPIKE_RECORD_LEN];
232  size_t spike_data_offset = 0;
233  for (size_t i = 0; i < num_spikes; i++) {
234  int spike_entry_chars =
235  snprintf(spike_entry, 64, "%.8g\t%d\n", spikevec_time[i], spikevec_gid[i]);
236  spike_data_offset =
237  strcat_at_pos(spike_data, spike_data_offset, spike_entry, spike_entry_chars);
238  }
239 
240  // calculate offset into global file. note that we don't write
241  // all num_bytes but only "populated" buffer
242  size_t num_chars = strlen(spike_data);
243 
244  nrnmpi_write_file(fname, spike_data, num_chars);
245 
246  free(spike_data);
247 }
248 #endif
249 
250 static void output_spikes_serial(const char* outpath) {
251  std::stringstream ss;
252  ss << outpath << "/out.dat";
253  std::string fname = ss.str();
254 
255  // reserve some space for sorted spikevec buffers
256  std::vector<double> sorted_spikevec_time(spikevec_time.size());
257  std::vector<int> sorted_spikevec_gid(spikevec_gid.size());
258  local_spikevec_sort(spikevec_time, spikevec_gid, sorted_spikevec_time, sorted_spikevec_gid);
259 
260  // remove if file already exist
261  remove(fname.c_str());
262 
263  FILE* f = fopen(fname.c_str(), "w");
264  if (!f && nrnmpi_myid == 0) {
265  std::cout << "WARNING: Could not open file for writing spikes." << std::endl;
266  return;
267  }
268 
269  for (std::size_t i = 0; i < sorted_spikevec_gid.size(); ++i)
270  if (sorted_spikevec_gid[i] > -1)
271  fprintf(f, "%.8g\t%d\n", sorted_spikevec_time[i], sorted_spikevec_gid[i]);
272 
273  fclose(f);
274 }
275 
276 void output_spikes(const char* outpath, const SpikesInfo& spikes_info) {
277  // try to transfer spikes to NEURON. If successfull, don't write out.dat
280  return;
281  }
282 #if NRNMPI
283  if (corenrn_param.mpi_enable && nrnmpi_initialized()) {
284  output_spikes_parallel(outpath, spikes_info);
285  } else
286 #endif
287  {
288  output_spikes_serial(outpath);
289  }
291 }
292 
294  auto spikevec_time_capacity = spikevec_time.capacity();
295  auto spikevec_gid_capacity = spikevec_gid.capacity();
296  spikevec_time.clear();
297  spikevec_gid.clear();
298  spikevec_time.reserve(spikevec_time_capacity);
299  spikevec_gid.reserve(spikevec_gid_capacity);
300 }
301 
302 void validation(std::vector<std::pair<double, int>>& res) {
303  for (unsigned i = 0; i < spikevec_gid.size(); ++i)
304  if (spikevec_gid[i] > -1)
305  res.push_back(std::make_pair(spikevec_time[i], spikevec_gid[i]));
306 }
307 } // namespace coreneuron
static void nrnmpi_dbl_alltoallv(const double *s, const int *scnt, const int *sdispl, double *r, int *rcnt, int *rdispl)
static void nrnmpi_int_alltoallv(const int *s, const int *scnt, const int *sdispl, int *r, int *rcnt, int *rdispl)
static void nrnmpi_barrier()
void unlock()
Definition: nrnmutdec.hpp:74
void lock()
Definition: nrnmutdec.hpp:72
#define i
Definition: md1redef.h:19
printf
Definition: extdef.h:5
THIS FILE IS AUTO GENERATED DONT MODIFY IT.
void spikevec_lock()
static OMP_Mutex mut
Definition: nrn_setup.cpp:154
static void output_spikes_serial(const char *outpath)
static void local_spikevec_sort(std::vector< double > &isvect, std::vector< int > &isvecg, std::vector< double > &osvect, std::vector< int > &osvecg)
void mk_spikevec_buffer(int sz)
void spikevec_unlock()
std::vector< double > spikevec_time
--> Coreneuron as SpikeBuffer class
void output_spikes(const char *outpath, const SpikesInfo &spikes_info)
void validation(std::vector< std::pair< double, int >> &res)
void clear_spike_vectors()
std::vector< int > spikevec_gid
corenrn_parameters corenrn_param
Printing method.
data_handle< T > transform(data_handle< T > handle, Transform type)
Definition: node.cpp:32
bool corenrn_embedded
--> Coreneuron
Definition: nrn_setup.cpp:47
int(* nrn2core_all_spike_vectors_return_)(std::vector< double > &spikevec, std::vector< int > &gidvec)
Definition: nrn_setup.cpp:73
size_t j
static double remove(void *v)
Definition: ocdeck.cpp:205
static bool all_spikes_return(std::vector< double > &spiketvec, std::vector< int > &spikegidvec)
Return all spike vectors to NEURON.
unsigned strcat_at_pos(char *dest, unsigned start_position, char *src, unsigned src_length)
Appends a copy of the source string to the destination string.
Utility functions for strings.
bool mpi_enable
Initialization seed for random number generator (int)