Rheolef  7.2
an efficient C++ finite element environment
 
Loading...
Searching...
No Matches
hack_array_mpi.icc
Go to the documentation of this file.
1
21#include "rheolef/config.h"
22#ifdef _RHEOLEF_HAVE_MPI
23
24#include "rheolef/hack_array.h"
25
26namespace rheolef {
27
28// ===============================================================
29// allocator
30// ===============================================================
31template<class T, class A>
33 : base(alloc),
34 _stash(),
35 _send(),
36 _receive(),
37 _receive_max_size(),
38 _ext_x()
39{
40}
41template<class T, class A>
42hack_array_mpi_rep<T,A>::hack_array_mpi_rep (const distributor& ownership, const parameter_type& param, const A& alloc)
43 : base(ownership,param,alloc),
44 _stash(),
45 _send(),
46 _receive(),
47 _receive_max_size(),
48 _ext_x()
49{
50}
51template <class T, class A>
52void
54{
55 base::resize (ownership, param);
56 _stash.clear();
57 _send.waits.clear();
58 _send.data.clear();
59 _receive.waits.clear();
60 _receive.data.clear();
61 _receive_max_size = 0;
62}
63// ===============================================================
64// assembly
65// ===============================================================
66template <class T, class A>
67void
69{
70 if (ownership().is_owned (dis_i)) {
71 size_type i = dis_i - ownership().first_index();
72 operator[](i) = val;
73 return;
74 }
75 assert_macro (dis_i < ownership().dis_size(), "index "<<dis_i
76 << " is out of range [0:" << ownership().dis_size() << "[");
77 // loop on the raw data vector (fixedsize, run-time known)
78 size_type dis_iraw = base::_value_size*dis_i + 1;
79 typename generic_value_type::const_iterator iter = val._data_begin();
80 for (size_type loc_iraw = 0, loc_nraw = val._data_size(); loc_iraw < loc_nraw; loc_iraw++, iter++) {
81 _stash.insert (std::pair<const size_type,raw_type>(dis_iraw + loc_iraw, *iter));
82 }
83}
84template <class T, class A>
85void
87{
88 _receive_max_size = mpi_assembly_begin (
89 _stash,
92 raw_base::ownership(),
93 _receive,
94 _send);
95
96 _stash.clear();
97}
98template <class T, class A>
99void
101{
103 _receive,
104 _send,
105 _receive_max_size,
107 raw_base::begin() - raw_base::ownership().first_index(),
108 details::generic_set_op(),
109 size_type(0),
110 std::false_type()));
111
112 _send.waits.clear();
113 _send.data.clear();
114 _receive.waits.clear();
115 _receive.data.clear();
116 _receive_max_size = 0;
117}
118// ===============================================================
119// scatter
120// ===============================================================
125template <class T, class A>
126template <class Set, class Map>
127void
128hack_array_mpi_rep<T,A>::append_dis_entry (const Set& ext_idx_set, Map& ext_idx_map) const
129{
130 // 0) declare the local context for raw data
133
134 // 1) convert set to vector, for direct acess & multiply by data_size
135 std::vector<size_type> raw_ext_idx (base::_data_size*ext_idx_set.size());
136 typename Set::const_iterator iter = ext_idx_set.begin();
137 for (size_type i = 0; i < ext_idx_set.size(); i++, iter++) {
138 size_type idx = *iter;
139 for (size_type l = 0; l < base::_data_size; l++) {
140 size_type raw_i = base::_data_size*i + l;
141 size_type raw_idx = base::_value_size*idx + l + (base::_value_size - base::_data_size);
142 raw_ext_idx [raw_i] = raw_idx;
143 }
144 }
145 // 2) declare id[i]=i for scatter
146 std::vector<size_type> raw_id (raw_ext_idx.size());
147 for (size_type i = 0; i < raw_id.size(); i++) raw_id[i] = i;
148
149 // 3) init scatter
150 size_type raw_dis_size = base::_value_size*ownership().dis_size();
151 size_type raw_size = base::_value_size*ownership().size();
152 distributor raw_ownership (raw_dis_size, ownership().comm(), raw_size);
155 raw_ext_idx.size(),
156 raw_ext_idx.begin().operator->(),
157 raw_id.size(),
158 raw_id.begin().operator->(),
159 raw_ownership.dis_size(),
160 raw_ownership.begin().operator->(),
161 tag_init,
162 raw_ownership.comm(),
163 raw_from,
164 raw_to);
165
166 // 4) begin scatter: send local data to others and get ask for missing data
167 std::vector<raw_type> raw_buffer (raw_ext_idx.size());
169 // access to an itrator to raw_data (behaves as a pointer on raw_type)
170 typedef typename base::base raw_base;
171 typename raw_base::const_iterator raw_begin = raw_base::begin();
173 raw_begin,
174 raw_buffer.begin().operator->(),
175 raw_from,
176 raw_to,
177 details::generic_set_op(),
178 tag,
179 raw_ownership.comm());
180
181 // 5) end scatter: receive missing data
183 raw_begin,
184 raw_buffer.begin().operator->(),
185 raw_from,
186 raw_to,
187 details::generic_set_op(),
188 tag,
189 raw_ownership.comm());
190
191 // 6) unpack raw data: build the associative container: pair (ext_idx ; data)
192 iter = ext_idx_set.begin();
193 for (size_type i = 0; i < ext_idx_set.size(); i++, iter++) {
194 size_type idx = *iter;
195 automatic_value_type value (base::_parameter);
196 typename automatic_value_type::iterator p = value._data_begin();
197 for (size_type l = 0; l < base::_data_size; l++, p++) {
198 size_type raw_i = base::_data_size*i + l;
199 *p = raw_buffer[raw_i];
200 }
201 ext_idx_map.insert (std::make_pair (idx, value));
202 }
203}
204template <class T, class A>
207{
208 if (ownership().is_owned(dis_i)) {
209 size_type i = dis_i - ownership().first_index();
210 return operator[](i);
211 }
212 typename scatter_map_type::const_iterator iter = _ext_x.find (dis_i);
213 check_macro (iter != _ext_x.end(), "unexpected external index="<<dis_i);
214 return (*iter).second;
215}
216template <class T, class A>
217void
219{
220 std::set<size_type> ext_i;
221 for (typename scatter_map_type::const_iterator
222 iter = _ext_x.begin(),
223 last = _ext_x.end(); iter != last; ++iter) {
224 ext_i.insert ((*iter).first);
225 }
226 get_dis_entry (ext_i, _ext_x);
227}
228// ===============================================================
229// put & get
230// ===============================================================
231template <class T, class A>
232template <class PutFunction>
234hack_array_mpi_rep<T,A>::put_values (odiststream& ops, PutFunction put_element) const
235{
237 std::ostream& os = ops.os();
238
239 // determine maximum message to arrive
240 size_type max_size;
241 mpi::reduce(comm(), size(), max_size, mpi::maximum<size_type>(), 0);
242
243 size_type io_proc = odiststream::io_proc();
244 if (ownership().process() == io_proc) {
245 base::put_values (ops, put_element);
246 // then, receive and print messages
247 std::vector<raw_type> raw_values (max_size*base::_data_size, std::numeric_limits<raw_type>::max());
248 for (size_type iproc = 0; iproc < ownership().n_process(); iproc++) {
249 if (iproc == io_proc) continue;
250 size_type loc_sz_i = ownership().size(iproc);
251 if (loc_sz_i == 0) continue;
252 mpi::status status = comm().recv(iproc, tag, raw_values.begin().operator->(), raw_values.size());
253 boost::optional<int> n_data_opt = status.count<raw_type>();
254 check_macro (n_data_opt, "receive failed");
255 size_type n_data = n_data_opt.get();
256 check_macro (n_data == loc_sz_i*base::_data_size, "unexpected receive message size");
257 typename T::automatic_type tmp (base::_parameter);
258 for (size_type i = 0; i < loc_sz_i; i++) {
259 typename T::iterator p = tmp._data_begin();
260 for (size_type iloc = 0; iloc < base::_data_size; iloc++, p++) {
261 *p = raw_values [i*base::_data_size + iloc];
262 }
263 put_element (os, tmp);
264 os << std::endl;
265 }
266 }
267 os << std::flush;
268 } else {
269 if (size() != 0) {
270 std::vector<raw_type> raw_values (size()*base::_data_size, std::numeric_limits<raw_type>::max());
271 for (size_type i = 0, n = size(); i < n; i++) {
272 for (size_type j = 0; j < base::_data_size; j++) {
273 raw_values [i*base::_data_size + j] = raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size));
274 }
275 }
276 comm().send(io_proc, tag, raw_values.begin().operator->(), raw_values.size());
277 }
278 }
279 return ops;
280}
281template <class T, class A>
287template <class T, class A>
288template <class GetFunction>
291{
293 std::istream& is = ips.is();
294 size_type my_proc = ownership().process();
296 size_type size_max = 1;
297 for (size_type iproc = 0; iproc < ownership().n_process(); iproc++) {
298 size_max = std::max (size_max, ownership().size(iproc));
299 }
300 distributor io_ownership (size_max, comm(), (my_proc == io_proc) ? size_max : 0);
301 hack_array_seq_rep<T,A> data_proc_j (io_ownership, base::_parameter);
302 if (my_proc == io_proc) {
303 // load first chunk associated to proc 0
304 check_macro (load_chunk (is, begin(), end(), get_element), "read failed on input stream.");
305 if (ownership().n_process() > 1) {
306 // read in other chuncks and send to other processors
307 // determine maximum chunck owned by other
308 std::vector<raw_type> raw_values (size_max*base::_data_size, std::numeric_limits<raw_type>::max());
309 typename hack_array_seq_rep<T,A>::iterator start_j = data_proc_j.begin();
310 // bizarre qu'on lise ts les blocs dans la meme zone de memoire
311 // et qu'on attende pas que ce soit envoye pour ecraser par le suivant ?
312 for (size_type jproc = 0; jproc < ownership().n_process(); jproc++) {
313 if (jproc == io_proc) continue;
314 // load first chunk associated to proc j
315 size_type loc_sz_j = ownership().size(jproc);
316 if (loc_sz_j == 0) continue;
317 typename hack_array_seq_rep<T,A>::iterator last_j = start_j + loc_sz_j;
318 check_macro (load_chunk (is, start_j, last_j, get_element), "read failed on input stream.");
319 for (size_type i = 0, n = loc_sz_j; i < n; i++) {
320 for (size_type j = 0; j < base::_data_size; j++) {
321 raw_values [i*base::_data_size + j] = data_proc_j.raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size));
322 }
323 }
324 comm().send (jproc, tag, raw_values.begin().operator->(), loc_sz_j*base::_data_size);
325 }
326 }
327 } else {
328 if (size() != 0) {
329 std::vector<raw_type> raw_values (size()*base::_data_size, std::numeric_limits<raw_type>::max());
330 comm().recv (io_proc, tag, raw_values.begin().operator->(), size()*base::_data_size);
331 for (size_type i = 0, n = size(); i < n; i++) {
332 for (size_type j = 0; j < base::_data_size; j++) {
333 raw_base::operator[] (i*base::_value_size + j+(base::_value_size - base::_data_size))
334 = raw_values [i*base::_data_size + j];
335 }
336 }
337 }
338 }
339 return ips;
340}
341template <class T, class A>
347template <class T, class A>
348template <class PutFunction, class Permutation>
351 odiststream& ops,
352 const Permutation& perm,
353 PutFunction put_element) const
354{
355 // NOTE: could be merged with disarray::permuted_put_value : same code exactly
356 assert_macro (perm.size() == size(), "permutation size does not match");
358 size_type my_proc = comm().rank();
359 distributor io_ownership (dis_size(), comm(), (my_proc == io_proc) ? dis_size() : 0);
360 hack_array_mpi_rep<T,A> perm_x (io_ownership, base::_parameter);
361 for (size_type i = 0, n = size(); i < n; i++) {
362 perm_x.dis_entry (perm[i]) = operator[](i);
363 }
364 perm_x.dis_entry_assembly();
365 perm_x.hack_array_seq_rep<T,A>::put_values (ops, put_element);
366 return ops;
367}
368// ===============================================================
369// repartition
370// ===============================================================
371template <class T, class A>
372template <class A2>
373void
374hack_array_mpi_rep<T,A>::repartition ( // old_numbering for *this
375 const disarray_rep<size_type,distributed,A2>& partition, // old_ownership
376 hack_array_mpi_rep<T,A>& new_array, // new_ownership
377 disarray_rep<size_type,distributed,A2>& old_numbering, // new_ownership
378 disarray_rep<size_type,distributed,A2>& new_numbering) const // old_ownership
379{
380 using namespace std;
381 communicator comm = ownership().comm();
382 size_type nproc = comm.size();
383 size_type my_proc = comm.rank();
384 vector<size_type> send_local_elt_size (nproc, 0);
385 typename disarray_rep<size_type,distributed,A2>::const_iterator iter_part = partition.begin();
386 for (size_type ie = 0; ie < partition.size(); ie++, iter_part++) {
387 send_local_elt_size [*iter_part]++;
388 }
389 vector<size_type> recv_local_elt_size (nproc, 0);
390 all_to_all (comm, send_local_elt_size, recv_local_elt_size);
391 vector<size_type> recv_local_elt_start (nproc+1);
392 recv_local_elt_start [0] = 0;
393 for (size_type iproc = 0; iproc < nproc; iproc++) {
394 recv_local_elt_start [iproc+1] = recv_local_elt_start [iproc] + recv_local_elt_size[iproc];
395 }
396 vector<size_type> send_local_elt_start (nproc);
397 all_to_all (comm, recv_local_elt_start.begin().operator->(), send_local_elt_start.begin().operator->());
398 size_type new_local_n_elt = recv_local_elt_start [nproc];
399 size_type global_n_elt = dis_size();
400
401 // re-distribute data:
402 distributor new_elt_ownership (global_n_elt, comm, new_local_n_elt);
403 new_array.resize (new_elt_ownership, base::_parameter); // CHANGE FROM ARRAY here
404 old_numbering.resize (new_elt_ownership, numeric_limits<size_type>::max());
405 new_numbering.resize (ownership(), numeric_limits<size_type>::max());
406 iter_part = partition.begin();
407 const_iterator iter_elt = begin();
408 typename disarray_rep<size_type,distributed,A2>::iterator iter_new_num_elt = new_numbering.begin();
409 for (size_type ie = 0, ne = partition.size(); ie < ne; ie++, iter_part++, iter_elt++, iter_new_num_elt++) {
410 size_type iproc = *iter_part;
411 const generic_value_type& x = *iter_elt; // CHANGE FROM ARRAY here
412 size_type new_global_ie = new_elt_ownership[iproc] + send_local_elt_start[iproc];
413 new_array.dis_entry (new_global_ie) = x;
414 *iter_new_num_elt = new_global_ie;
415 size_type old_global_ie = ownership()[my_proc] + ie;
416 old_numbering.dis_entry (new_global_ie) = old_global_ie;
417 send_local_elt_start[iproc]++;
418 }
419 new_array.dis_entry_assembly();
420 old_numbering.template dis_entry_assembly<typename details::default_set_op_traits<size_type>::type>();
421}
422
423} // namespace rheolef
424#endif // _RHEOLEF_HAVE_MPI
field::size_type size_type
Definition branch.cc:430
see the communicator page for the full documentation
see the distributor page for the full documentation
Definition distributor.h:69
size_type dis_size() const
global and local sizes
static tag_type get_new_tag()
returns a new tag
const communicator_type & comm() const
const_reference dis_at(size_type dis_i) const
odiststream & put_values(odiststream &ops) const
void repartition(const disarray_rep< size_type, distributed, A2 > &partition, hack_array_mpi_rep< T, A > &new_array, disarray_rep< size_type, distributed, A2 > &old_numbering, disarray_rep< size_type, distributed, A2 > &new_numbering) const
odiststream & permuted_put_values(odiststream &ops, const Permutation &perm, PutFunction put_element) const
base::size_type size_type
Definition hack_array.h:164
base::const_reference const_reference
Definition hack_array.h:172
hack_array_mpi_rep(const A &alloc=A())
void resize(const distributor &ownership, const parameter_type &param)
base::const_iterator const_iterator
Definition hack_array.h:174
base::parameter_type parameter_type
Definition hack_array.h:170
base::automatic_value_type automatic_value_type
Definition hack_array.h:168
void append_dis_entry(const Set &ext_idx_set, Map &ext_idx_map) const
get values from ext_idx_set, that are managed by another proc new version: instead of sending automat...
base::generic_value_type generic_value_type
Definition hack_array.h:167
void set_dis_entry(size_type dis_i, const generic_value_type &val)
idiststream & get_values(idiststream &ips)
hack_array_iterator< generic_value_type, generic_value_type &, generic_value_type *, raw_type, raw_type * > iterator
Definition hack_array.h:98
idiststream: see the diststream page for the full documentation
Definition diststream.h:336
std::istream & is()
Definition diststream.h:400
odiststream: see the diststream page for the full documentation
Definition diststream.h:137
std::ostream & os()
Definition diststream.h:247
static size_type io_proc()
Definition diststream.cc:79
#define assert_macro(ok_condition, message)
Definition dis_macros.h:113
check_macro(expr1.have_homogeneous_space(Xh1), "dual(expr1,expr2); expr1 should have homogeneous space. HINT: use dual(interpolate(Xh, expr1),expr2)")
This file is part of Rheolef.
Size mpi_assembly_end(Message &receive, Message &send, Size receive_max_size, Container x)
void mpi_scatter_end(InputIterator x, OutputIterator y, Message &from, Message &to, SetOp op, Tag tag, Comm comm)
void mpi_scatter_begin(InputIterator x, OutputIterator y, Message &from, Message &to, SetOp op, Tag tag, Comm comm)
bool load_chunk(std::istream &s, RandomIterator iter, RandomIterator last)
Definition load_chunk.h:27
disarray_store< OutputRandomIterator, SetOp, Size, IsContainer > disarray_make_store(OutputRandomIterator x, SetOp op, Size, IsContainer)
void mpi_scatter_init(Size nidx, SizeRandomIterator1 idx, Size nidy, SizeRandomIterator2 idy, Size idy_maxval, SizeRandomIterator3 ownership, Tag tag, const distributor::communicator_type &comm, Message &from, Message &to)
apply_iterator< Iterator, Operator > make_apply_iterator(Iterator i, Operator op)
Definition msg_util.h:114
Stash::size_type mpi_assembly_begin(const Stash &stash, InputIterator first_stash_idx, InputIterator last_stash_idx, const distributor &ownership, Message &receive, Message &send)
STL namespace.
Definition sphere.icc:25
iterator begin()
Definition Vector.h:217
disarray element input helper
Definition disarray.h:205
disarray element output helper
Definition disarray.h:196