Rheolef  7.2
an efficient C++ finite element environment
 
Loading...
Searching...
No Matches
mpi_scatter_init.h
Go to the documentation of this file.
1#ifndef _RHEO_MPI_SCATTER_INIT_H
2#define _RHEO_MPI_SCATTER_INIT_H
23
24#include "rheolef/compiler.h"
25#include "rheolef/communicator.h"
26#include "rheolef/scatter_message.h"
27
28#include "rheolef/msg_sort_with_permutation.h"
29#include "rheolef/msg_to_context.h"
30#include "rheolef/msg_from_context_pattern.h"
31#include "rheolef/msg_from_context_indices.h"
32#include "rheolef/msg_local_context.h"
33#include "rheolef/msg_local_optimize.h"
34
35#include "rheolef/msg_util.h"
36
37#pragma GCC diagnostic push
38#pragma GCC diagnostic ignored "-Weffc++"
39#pragma GCC diagnostic ignored "-Wparentheses"
40#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
41#include <boost/functional.hpp>
42#include <boost/iterator/transform_iterator.hpp>
43#pragma GCC diagnostic pop
44
45/*F:
46NAME: mpi_scatter_init -- gather/scatter initialize (@PACKAGE@ @VERSION@)
47DESCRIPTION:
48 Initialize communication
49 for distributed to sequential scatter context.
50COMPLEXITY:
51 Time and memory complexity is O(nidx+nproc).
52 For finite-element problems in d dimenion
53
54| nidx ~ N^((d-1)/d)
55
56 where N is the number of degrees of freedom.
57
58IMPLEMENTATION
59 Inspirated from petsc-2.0/vpscat.c: VecScatterCreate_PtoS()
60AUTHORS:
61 LMC-IMAG, 38041 Grenoble cedex 9, France
62 | Pierre.Saramito@imag.fr
63DATE: 23 march 1999
64END:
65*/
66
67namespace rheolef {
68
69//<mpi_scatter_init:
70template <class Message, class Size, class SizeRandomIterator1,
71 class SizeRandomIterator2, class SizeRandomIterator3, class Tag>
72void
74// input:
75 Size nidx,
76 SizeRandomIterator1 idx,
77 Size nidy,
78 SizeRandomIterator2 idy,
79 Size idy_maxval,
80 SizeRandomIterator3 ownership,
81 Tag tag,
83// output:
84 Message& from,
85 Message& to)
86{
87 typedef Size size_type;
88 size_type my_proc = comm.rank();
89 size_type nproc = comm.size();
90
91 // -------------------------------------------------------
92 // 1) first count number of contributors to each processor
93 // -------------------------------------------------------
94 std::vector<size_type> msg_size(nproc, 0);
95 std::vector<size_type> msg_mark(nproc, 0);
96 std::vector<size_type> owner (nidx);
97 size_type send_nproc = 0;
98 {
99 size_type iproc = 0;
100 for (size_type i = 0; i < nidx; i++) {
101 for (; iproc < nproc; iproc++) {
102 if (idx[i] >= ownership[iproc] && idx[i] < ownership[iproc+1]) {
103 owner[i] = iproc;
104 msg_size [iproc]++;
105 if (!msg_mark[iproc]) {
106 msg_mark[iproc] = 1;
107 send_nproc++;
108 }
109 break;
110 }
111 }
112 check_macro (iproc != nproc, "bad stash data: idx["<<i<<"]="<<idx[i]<<" out of range [0:"<<ownership[nproc]<<"[");
113 }
114 } // end block
115 // -------------------------------------------------------
116 // 2) avoid to send message to my-proc in counting
117 // -------------------------------------------------------
118 size_type n_local = msg_size[my_proc];
119 if (n_local != 0) {
120 msg_size [my_proc] = 0;
121 msg_mark [my_proc] = 0;
122 send_nproc--;
123 }
124 // ----------------------------------------------------------------
125 // 3) compute number of messages to be send to my_proc
126 // ----------------------------------------------------------------
127 std::vector<size_type> work(nproc);
128 mpi::all_reduce (
129 comm,
130 msg_mark.begin().operator->(),
131 nproc,
132 work.begin().operator->(),
133 std::plus<size_type>());
134 size_type receive_nproc = work [my_proc];
135 // ----------------------------------------------------------------
136 // 4) compute messages max size to be send to my_proc
137 // ----------------------------------------------------------------
138 mpi::all_reduce (
139 comm,
140 msg_size.begin().operator->(),
141 nproc,
142 work.begin().operator->(),
143 mpi::maximum<size_type>());
144 size_type receive_max_size = work [my_proc];
145 // ----------------------------------------------------------------
146 // 5) post receive: exchange the buffer adresses between processes
147 // ----------------------------------------------------------------
148 std::list<std::pair<size_type,mpi::request> > receive_waits;
149 std::vector<size_type> receive_data (receive_nproc*receive_max_size);
150 for (size_type i_receive = 0; i_receive < receive_nproc; i_receive++) {
151 mpi::request i_req = comm.irecv (
152 mpi::any_source,
153 tag,
154 receive_data.begin().operator->() + i_receive*receive_max_size,
155 receive_max_size);
156 receive_waits.push_back (std::make_pair(i_receive, i_req));
157 }
158 // ---------------------------------------------------------------------------
159 // 6) compute the send indexes
160 // ---------------------------------------------------------------------------
161 // comme idx est trie, on peut faire une copie de idx dans send_data
162 // et du coup owner et send_data_ownership sont inutiles
163 std::vector<size_type> send_data (nidx);
164 std::copy (idx, idx+nidx, send_data.begin());
165 // ---------------------------------------------------------------------------
166 // 7) do send
167 // ---------------------------------------------------------------------------
168 std::list<std::pair<size_type,mpi::request> > send_waits;
169 {
170 size_type i_send = 0;
171 size_type i_start = 0;
172 for (size_type iproc = 0; iproc < nproc; iproc++) {
173 size_type i_msg_size = msg_size[iproc];
174 if (i_msg_size == 0) continue;
175 mpi::request i_req = comm.isend (
176 iproc,
177 tag,
178 send_data.begin().operator->() + i_start,
179 i_msg_size);
180 send_waits.push_back(std::make_pair(i_send,i_req));
181 i_send++;
182 i_start += i_msg_size;
183 }
184 } // end block
185 // ---------------------------------------------------------------------------
186 // 8) wait on receives
187 // ---------------------------------------------------------------------------
188 // note: for wait_all, build an iterator adapter that scan the pair.second in [index,request]
189 // and then get an iterator in the pair using iter.base(): retrive the corresponding index
190 // for computing the position in the receive.data buffer
191 typedef boost::transform_iterator<select2nd<size_t,mpi::request>, std::list<std::pair<size_t,mpi::request> >::iterator>
192 request_iterator;
193 std::vector<size_type> receive_size (receive_nproc);
194 std::vector<size_type> receive_proc (receive_nproc);
195 size_type receive_total_size = 0;
196 while (receive_waits.size() != 0) {
197 typedef size_type data_type; // exchanged data is of "size_type"
198 request_iterator iter_r_waits (receive_waits.begin(), select2nd<size_t,mpi::request>()),
199 last_r_waits (receive_waits.end(), select2nd<size_t,mpi::request>());
200 // waits on any receive...
201 std::pair<mpi::status,request_iterator> pair_status = mpi::wait_any (iter_r_waits, last_r_waits);
202 // check status
203 boost::optional<int> i_msg_size_opt = pair_status.first.count<data_type>();
204 check_macro (i_msg_size_opt, "receive wait failed");
205 int iproc = pair_status.first.source();
206 check_macro (iproc >= 0, "receive: source iproc = "<<iproc<<" < 0 !");
207 // get size of receive and number in data
208 size_type i_msg_size = (size_t)i_msg_size_opt.get();
209 std::list<std::pair<size_t,mpi::request> >::iterator i_pair_ptr = pair_status.second.base();
210 size_type i_receive = (*i_pair_ptr).first;
211 receive_proc [i_receive] = iproc;
212 receive_size [i_receive] = i_msg_size;
213 receive_total_size += i_msg_size;
214 receive_waits.erase (i_pair_ptr);
215 }
216 // ---------------------------------------------------------------------------
217 // 9) allocate the entire send(to) scatter context
218 // ---------------------------------------------------------------------------
219 to.resize (receive_total_size, receive_nproc);
220
221 // ---------------------------------------------------------------------------
222 // 10) compute the permutation of values that gives the sorted source[] sequence
223 // ---------------------------------------------------------------------------
224 // init: perm[i] = i
225 std::vector<size_type> perm(receive_nproc);
226 copy(index_iterator<size_type>(), index_iterator<size_type>(receive_nproc), perm.begin());
228 receive_proc.begin().operator->(),
229 perm.begin().operator->(),
230 receive_nproc);
231 // ---------------------------------------------------------------------------
232 // 11) Computes the receive compresed message pattern for send(to)
233 // ---------------------------------------------------------------------------
234 size_type istart = ownership[my_proc]; // = ownership.first_index()
236 perm.begin(),
237 perm.end(),
238 receive_proc.begin(),
239 receive_size.begin(),
240 receive_data.begin(),
241 receive_max_size,
242 istart,
243 to.procs().begin(),
244 to.starts().begin(),
245 to.indices().begin());
246 // ---------------------------------------------------------------------------
247 // 12) allocate the entire receive(from) scatter context
248 // ---------------------------------------------------------------------------
249 from.resize(nidy, send_nproc);
250 // ---------------------------------------------------------------------------
251 // 13) Computes the receive compresed message pattern for receive(from)
252 // ---------------------------------------------------------------------------
253 std::vector<size_type> proc2from_proc(nproc);
255 msg_size.begin(),
256 msg_size.end(),
257 from.procs().begin(),
258 from.starts().begin(),
259 proc2from_proc.begin());
260 // ---------------------------------------------------------------------------
261 // 14) Computes the receive compresed message indices for receive(from)
262 // ---------------------------------------------------------------------------
263 // assume that indices are sorted by increasing order
264 std::vector<size_type> start(send_nproc+1);
265 copy (from.starts().begin(), from.starts().end(), start.begin());
267 owner.begin(),
268 owner.end(),
269 idy,
270 proc2from_proc.begin(),
271 my_proc,
272 idy_maxval,
273 start.begin(),
274 from.indices().begin());
275 // ---------------------------------------------------------------------------
276 // 15) wait on sends
277 // ---------------------------------------------------------------------------
278 if (send_waits.size() != 0) {
279 request_iterator iter_s_waits (send_waits.begin(), select2nd<size_type,mpi::request>()),
280 last_s_waits (send_waits.end(), select2nd<size_type,mpi::request>());
281 mpi::wait_all (iter_s_waits, last_s_waits);
282 }
283 // ---------------------------------------------------------------------------
284 // 16) Computes the receive compresed message local pattern,
285 // i.e. the only part that does not requires communication.
286 // ---------------------------------------------------------------------------
287 from.local_slots.resize(n_local);
288 to.local_slots.resize(n_local);
289 size_type ilast = ownership[my_proc+1]; // = ownership.last_index()
291 idx,
292 idx+nidx,
293 idy,
294 idy_maxval,
295 istart,
296 ilast,
297 to.local_slots.begin(),
298 to.local_slots.end(),
299 from.local_slots.begin());
300 // ---------------------------------------------------------------------------
301 // 17) Optimize local exchanges during gatter/scatter
302 // ---------------------------------------------------------------------------
303 bool has_opt = msg_local_optimize (
304 to.local_slots.begin(),
305 to.local_slots.end(),
306 from.local_slots.begin());
307
308 if (has_opt && n_local != 0) {
309 to.local_is_copy = true;
310 to.local_copy_start = to.local_slots[0];
311 to.local_copy_length = n_local;
312 from.local_is_copy = true;
313 from.local_copy_start = from.local_slots[0];
314 from.local_copy_length = n_local;
315 }
316}
317//>mpi_scatter_init:
318} // namespace rheolef
319#endif // _RHEO_MPI_SCATTER_INIT_H
field::size_type size_type
Definition branch.cc:430
communicator communicator_type
Definition distributor.h:79
check_macro(expr1.have_homogeneous_space(Xh1), "dual(expr1,expr2); expr1 should have homogeneous space. HINT: use dual(interpolate(Xh, expr1),expr2)")
This file is part of Rheolef.
void msg_to_context(InputIterator1 perm, InputIterator1 last_perm, InputRandomIterator2 r_iproc, InputRandomIterator3 r_size, InputRandomIterator4 r_idx, Size receive_max_size, Size istart, OutputIterator1 to_proc, OutputIterator2 to_ptr, OutputIterator3 to_idx)
void msg_local_context(InputIterator1 idx, InputIterator1 last_idx, InputIterator2 idy, Size idy_maxval, Size istart, Size ilast, OutputIterator1 to_loc_idx, OutputIterator1 last_to_loc_idx, OutputIterator2 from_loc_idy)
void msg_from_context_indices(InputIterator1 owner, InputIterator1 last_owner, InputIterator2 idy, InputRandomIterator proc2from_proc, Proc my_proc, Size idy_maxval, MutableRandomIterator ptr, OutputIterator from_idx)
void sort_with_permutation(RandomIterator v, SizeRandomIterator p, Size n)
bool msg_local_optimize(InputIterator1 to_loc_idx, InputIterator1 last_to_loc_idx, InputIterator2 from_loc_idy)
void mpi_scatter_init(Size nidx, SizeRandomIterator1 idx, Size nidy, SizeRandomIterator2 idy, Size idy_maxval, SizeRandomIterator3 ownership, Tag tag, const distributor::communicator_type &comm, Message &from, Message &to)
void msg_from_context_pattern(InputIterator1 msg_size, InputIterator1 last_msg_size, OutputIterator1 from_proc, OutputIterator2 from_ptr, OutputIterator3 proc2from_proc)