00001 #ifndef _PAR_GRAPH_HPP_
00002 #define _PAR_GRAPH_HPP_
00003
00004 #include <stddef.h>
00005 #include "PG_mpi_traits.hpp"
00006
00007
00008
00009
00010
00011 namespace ParGraph
00012 {
00013
00014 typedef unsigned int proc_t;
00015
00016 typedef unsigned int VertexID_t;
00017 typedef unsigned int EdgeID_t;
00018
00020 void ParGraph_init( int& argc, char**& argv );
00022 void ParGraph_init(int& argc, char**& argv, MPI_Comm& user_comm);
00024 void ParGraph_finalize();
00025
00027 bool PG_ReduceOr( bool );
00029 bool PG_ReduceAnd( bool );
00031 size_t PG_ReduceAdd( size_t );
00033 template< class A, class Function >
00034 A PG_Reduce( A a, Function f )
00035 {
00036 A* tmp = new A[_libbase.size()];
00037 MPI_Allgather( &a, 1, mpi_traits<A>::mpi_type,
00038 tmp, 1, mpi_traits<A>::mpi_type, _libbase.comm() );
00039 return std::accumulate( tmp, tmp+_libbase.size(), A(), f );
00040 }
00041
00043 class comm_layer_t
00044 {
00045 MPI_Comm _comm;
00046 proc_t _rank;
00047 proc_t _size;
00048 bool _mpi_was_init;
00049 int *_sendrecv_buffer, *scounts, *sdispls, *rcounts, *rdispls;
00050 public:
00051 inline MPI_Comm comm() const
00052 {
00053 return _comm;
00054 }
00055
00056 inline size_t rank() const
00057 {
00058 return _rank;
00059 }
00060
00061 inline size_t size() const
00062 {
00063 return _size;
00064 }
00065
00066 friend void ParGraph_init( int&, char**& );
00067 friend void ParGraph_init( int&, char**&, MPI_Comm& );
00068 friend void ParGraph_common_init();
00069 friend void ParGraph_finalize();
00070 friend void _pg_mpi_reduce_op_wrapper( void*, void*, int*, MPI_Datatype* );
00071 template< class A >
00072 friend A PG_Reduce( A, void (*op)(void*,void*) );
00073 };
00074
00075 extern comm_layer_t _libbase;
00076
00078 inline size_t rank()
00079 {
00080 return _libbase.rank();
00081 }
00082
00084 inline size_t size()
00085 {
00086 return _libbase.size();
00087 }
00088
00089 const int COMMUNICATION_BUFFER_INITIAL_SIZE = 10000;
00090
00092 class AllGatherBuffer
00093 {
00094 char *send_buffer;
00095 int send_size;
00096 int send_maxsize;
00097 int send_position;
00098
00099 char *recv_buffer;
00100 int *recv_buffer_parts;
00101 int *recv_size;
00102 int cur_recv_position;
00103 int whole_recv_size;
00104 proc_t last_recv_proc;
00105
00106 public:
00107 AllGatherBuffer();
00108 ~AllGatherBuffer();
00109
00111 template< class A >
00112 void put( A );
00114 template< class A >
00115 void put( const A * element, size_t count );
00116
00117 void communicate();
00118
00120 inline bool is_empty() { return cur_recv_position >= whole_recv_size; }
00122 template< class A >
00123 proc_t get( A * element, size_t count = 1 );
00124 };
00125
00126 template< class A >
00127 inline void
00128 AllGatherBuffer::put( A element )
00129 {
00130 put( & element, 1 );
00131 }
00132
00133 template< class A >
00134 void
00135 AllGatherBuffer::put( const A * element, size_t count )
00136 {
00137 if( send_buffer == 0 )
00138 {
00139 send_buffer = (char*) malloc( COMMUNICATION_BUFFER_INITIAL_SIZE );
00140 send_maxsize = COMMUNICATION_BUFFER_INITIAL_SIZE;
00141 }
00142
00143 int diff;
00144 diff = sizeof(int)*count;
00145
00146
00147 if( send_size + diff > send_maxsize )
00148 {
00149 send_maxsize = 2 * (send_size + diff);
00150 if( (send_buffer = (char*) realloc( (void*) send_buffer, send_maxsize )) == 0 )
00151 {
00152 std::cerr<<"Not enough Memory in AllGatherBuffer"<<std::endl;
00153 exit( 1 );
00154 }
00155 }
00156 send_size += diff;
00157
00158 for( size_t i = 0 ; i < count ; i++ )
00159 {
00160 *(reinterpret_cast<A *> (send_buffer + send_position)) = *element;
00161 element++;
00162 send_position+=sizeof(int);
00163 }
00164
00165
00166
00167 }
00168
00169 template< class A >
00170 proc_t
00171 AllGatherBuffer::get( A * element, size_t count )
00172 {
00173 for(; last_recv_proc < _libbase.size() - 1 &&
00174 recv_buffer_parts[last_recv_proc+1]<=cur_recv_position ; last_recv_proc++ );
00175 for( size_t i = 0 ; i < count ; i++ )
00176 {
00177 *element = *(reinterpret_cast<A *> (recv_buffer + cur_recv_position));
00178 element++;
00179 cur_recv_position+=sizeof(int);
00180 }
00181
00182
00183
00184
00185 return last_recv_proc;
00186 }
00187
00188
00189
00190 class AllToAllBuffer
00191 {
00192 char **send_buffer;
00193 int *send_size;
00194 int *send_maxsize;
00195 int *send_position;
00196 proc_t cur_proc;
00197
00198 char *recv_buffer;
00199 int *recv_buffer_parts;
00200 int *recv_size;
00201 int cur_recv_position;
00202 int whole_recv_size;
00203 proc_t last_recv_proc;
00204
00205 public:
00206 AllToAllBuffer();
00207 ~AllToAllBuffer();
00208
00210 inline void set_proc( proc_t proc ) { cur_proc = proc; }
00212 template< class A >
00213 inline void put( A element );
00215 template< class A >
00216 inline void put( const A * element, size_t count );
00218 template< class A >
00219 void put( const A * element, size_t count, proc_t proc );
00220
00221 void communicate();
00222
00224 inline bool is_empty() { return cur_recv_position >= whole_recv_size; }
00226 template< class A >
00227 proc_t get( A * element, size_t count = 1 );
00228 };
00229
00230 template< class A >
00231 inline void
00232 AllToAllBuffer::put( A element )
00233 {
00234 put( & element, 1, cur_proc );
00235 }
00236
00237 template< class A >
00238 inline void
00239 AllToAllBuffer::put( const A * element, size_t count )
00240 {
00241 put( element, count, cur_proc );
00242 }
00243
00244 template<>
00245 inline void
00246 AllToAllBuffer::put( const bool * element, size_t count )
00247 {
00248
00249 int value = (*element) ? -1 : 0;
00250 put( & value, 1, cur_proc );
00251 }
00252
00253 template< class A >
00254 void
00255 AllToAllBuffer::put( const A * element, size_t count, proc_t proc )
00256 {
00257 if( send_buffer[proc] == 0 )
00258 {
00259 send_buffer[proc] = (char*) malloc( COMMUNICATION_BUFFER_INITIAL_SIZE );
00260 send_maxsize[proc] = COMMUNICATION_BUFFER_INITIAL_SIZE;
00261 }
00262
00263 int diff;
00264
00265 diff=sizeof(int)*count;
00266 if( send_size[proc] + diff > send_maxsize[proc] )
00267 {
00268 send_maxsize[proc] = 2 * (send_size[proc] + diff);
00269 if( (send_buffer[proc] = (char*) realloc( (void*) send_buffer[proc], send_maxsize[proc] ))
00270 == 0 )
00271 {
00272 std::cerr<<"Not enough Memory in AllToAllBuffer"<<std::endl;
00273 exit( 1 );
00274 }
00275 }
00276 send_size[proc] += diff;
00277
00278 for( size_t i = 0 ; i < count ; i++ )
00279 {
00280 *(reinterpret_cast<A *> (send_buffer[proc] + send_position[proc])) = *element;
00281 element++;
00282 send_position[proc]+=sizeof(int);
00283 }
00284
00285
00286
00287 }
00288
00289 template< class A >
00290 proc_t
00291 AllToAllBuffer::get( A * element, size_t count )
00292 {
00293 for(; last_recv_proc < _libbase.size() - 1 &&
00294 recv_buffer_parts[last_recv_proc+1]<=cur_recv_position ; last_recv_proc++ );
00295 for( size_t i = 0 ; i < count ; i++ )
00296 {
00297 *element = *(reinterpret_cast<A *> (recv_buffer + cur_recv_position));
00298 element++;
00299 cur_recv_position+=sizeof(int);
00300 }
00301
00302
00303
00304
00305 return last_recv_proc;
00306 }
00307
00308 template<>
00309 inline proc_t
00310 AllToAllBuffer::get( bool * element, size_t count )
00311 {
00312
00313 int value;
00314 proc_t p = get( & value, 1 );
00315 *element = (value==0) ? false : true;
00316 return p;
00317 }
00318 }
00319
00320 #endif // _PAR_GRAPH_HPP_