Main Page   Class Hierarchy   Compound List   File List   Compound Members  

ParGraph.hpp

00001 #ifndef _PAR_GRAPH_HPP_
00002 #define _PAR_GRAPH_HPP_
00003 
00004 #include <stddef.h>
00005 #include "PG_mpi_traits.hpp"
00006 
00007 /* content: communication layer
00008             abstract communication buffers -> class AllToAllBuffer
00009 */
00010 
00011 namespace ParGraph
00012 {
00013 //typedef unsigned int size_t;
00014 typedef unsigned int proc_t;
00015 
00016 typedef unsigned int VertexID_t;
00017 typedef unsigned int EdgeID_t;
00018 
00020 void ParGraph_init( int& argc, char**& argv );
00022 void ParGraph_init(int& argc, char**& argv, MPI_Comm& user_comm);
00024 void ParGraph_finalize();
00025 
00027 bool PG_ReduceOr( bool );
00029 bool PG_ReduceAnd( bool );
00031 size_t PG_ReduceAdd( size_t );
00033 template< class A, class Function >
00034 A PG_Reduce( A a, Function f )
00035 {
00036   A* tmp = new A[_libbase.size()];
00037   MPI_Allgather( &a, 1, mpi_traits<A>::mpi_type,
00038                  tmp, 1, mpi_traits<A>::mpi_type, _libbase.comm() );
00039   return std::accumulate( tmp, tmp+_libbase.size(), A(), f );
00040 }
00041 
00043 class comm_layer_t
00044 {
00045     MPI_Comm _comm;
00046     proc_t   _rank;
00047     proc_t   _size;
00048     bool     _mpi_was_init;
00049     int     *_sendrecv_buffer, *scounts, *sdispls, *rcounts, *rdispls;
00050 public:
00051     inline MPI_Comm comm() const
00052     {
00053         return _comm;
00054     }
00055 
00056     inline size_t rank() const
00057     {
00058         return _rank;
00059     }
00060 
00061     inline size_t size() const
00062     {
00063         return _size;
00064     }
00065 
00066     friend void ParGraph_init( int&, char**& );
00067     friend void ParGraph_init( int&, char**&, MPI_Comm& );
00068     friend void ParGraph_common_init();
00069     friend void ParGraph_finalize();
00070     friend void _pg_mpi_reduce_op_wrapper( void*, void*, int*, MPI_Datatype* );
00071     template< class A >
00072     friend A PG_Reduce( A, void (*op)(void*,void*) );
00073 };
00074 
00075 extern comm_layer_t _libbase;
00076 
00078 inline size_t rank()
00079 {
00080   return _libbase.rank();
00081 }
00082 
00084 inline size_t size()
00085 {
00086   return _libbase.size();
00087 }
00088 
00089 const int COMMUNICATION_BUFFER_INITIAL_SIZE = 10000;
00090 
00092 class AllGatherBuffer
00093 {
00094     char *send_buffer;
00095     int send_size;
00096     int send_maxsize;
00097     int send_position;
00098 
00099     char *recv_buffer;
00100     int *recv_buffer_parts;
00101     int *recv_size;
00102     int cur_recv_position;
00103     int whole_recv_size;
00104     proc_t last_recv_proc;
00105 
00106 public:
00107     AllGatherBuffer();
00108     ~AllGatherBuffer();
00109 
00111     template< class A >
00112     void put( A );
00114     template< class A >
00115     void put( const A * element, size_t count );
00116 
00117     void communicate();
00118 
00120     inline bool is_empty() { return cur_recv_position >= whole_recv_size; }
00122     template< class A >
00123     proc_t get( A * element, size_t count = 1 );
00124 };
00125 
00126 template< class A >
00127 inline void
00128 AllGatherBuffer::put( A element )
00129 {
00130   put( & element, 1 );
00131 }
00132 
00133 template< class A >
00134 void
00135 AllGatherBuffer::put( const A * element, size_t count )
00136 {
00137   if( send_buffer == 0 )
00138   {
00139     send_buffer = (char*) malloc( COMMUNICATION_BUFFER_INITIAL_SIZE );
00140     send_maxsize = COMMUNICATION_BUFFER_INITIAL_SIZE;
00141   }
00142 
00143   int diff;
00144   diff = sizeof(int)*count;     // WORKAROUND
00145 //  MPI_Pack_size( count, mpi_traits<A>::mpi_type, _libbase.comm(), &diff );
00146 
00147   if( send_size + diff > send_maxsize )
00148   {
00149     send_maxsize = 2 * (send_size + diff);
00150     if( (send_buffer = (char*) realloc( (void*) send_buffer, send_maxsize )) == 0 )
00151     {
00152       std::cerr<<"Not enough Memory in AllGatherBuffer"<<std::endl; 
00153       exit( 1 );
00154     }
00155   }
00156   send_size += diff;
00157 
00158   for( size_t i = 0 ; i < count ; i++ ) // WORKAROUND Begin
00159   {
00160     *(reinterpret_cast<A *> (send_buffer + send_position)) = *element;
00161     element++;
00162     send_position+=sizeof(int);
00163   } // WORKAROUND End
00164 //  MPI_Pack( const_cast<A*>(element), count, mpi_traits<A>::mpi_type,
00165 //            send_buffer, send_maxsize, &send_position,
00166 //            _libbase.comm() );
00167 }
00168 
00169 template< class A >
00170 proc_t
00171 AllGatherBuffer::get( A * element, size_t count )
00172 {
00173   for(; last_recv_proc < _libbase.size() - 1 && 
00174         recv_buffer_parts[last_recv_proc+1]<=cur_recv_position ; last_recv_proc++ );
00175   for( size_t i = 0 ; i < count ; i++ ) // WORKAROUND Begin
00176   {
00177     *element = *(reinterpret_cast<A *> (recv_buffer + cur_recv_position));
00178     element++;
00179     cur_recv_position+=sizeof(int);
00180   } // WORKAROUND End
00181 //  MPI_Unpack( recv_buffer, whole_recv_size, &cur_recv_position,
00182 //            element, count, mpi_traits<A>::mpi_type,
00183 //            _libbase.comm() );
00184 
00185   return last_recv_proc;
00186 }
00187 
00188 
00189 
00190 class AllToAllBuffer
00191 {
00192     char **send_buffer;
00193     int *send_size;
00194     int *send_maxsize;
00195     int *send_position;
00196     proc_t cur_proc;
00197 
00198     char *recv_buffer;
00199     int *recv_buffer_parts;
00200     int *recv_size;
00201     int cur_recv_position;
00202     int whole_recv_size;
00203     proc_t last_recv_proc;
00204 
00205 public:
00206     AllToAllBuffer();
00207     ~AllToAllBuffer();
00208 
00210     inline void set_proc( proc_t proc ) { cur_proc = proc; }
00212     template< class A >
00213     inline void put( A element );
00215     template< class A >
00216     inline void put( const A * element, size_t count );
00218     template< class A >
00219     void put( const A * element, size_t count, proc_t proc );
00220 
00221     void communicate();
00222 
00224     inline bool is_empty() { return cur_recv_position >= whole_recv_size; }
00226     template< class A >
00227     proc_t get( A * element, size_t count = 1 );
00228 };
00229 
00230 template< class A >
00231 inline void
00232 AllToAllBuffer::put( A element )
00233 {
00234   put( & element, 1, cur_proc );
00235 }
00236 
00237 template< class A >
00238 inline void
00239 AllToAllBuffer::put( const A * element, size_t count )
00240 {
00241   put( element, count, cur_proc );
00242 }
00243 
00244 template<>
00245 inline void
00246 AllToAllBuffer::put( const bool * element, size_t count )
00247 {
00248 // TODO: greater count than 1
00249   int value = (*element) ? -1 : 0;
00250   put( & value, 1, cur_proc );
00251 }
00252 
00253 template< class A >
00254 void
00255 AllToAllBuffer::put( const A * element, size_t count, proc_t proc )
00256 {
00257   if( send_buffer[proc] == 0 )
00258   {
00259     send_buffer[proc] = (char*) malloc( COMMUNICATION_BUFFER_INITIAL_SIZE );
00260     send_maxsize[proc] = COMMUNICATION_BUFFER_INITIAL_SIZE;
00261   }
00262 
00263   int diff;
00264 //  MPI_Pack_size( count, mpi_traits<A>::mpi_type, _libbase.comm(), &diff );
00265   diff=sizeof(int)*count; // WORKAROUND
00266   if( send_size[proc] + diff > send_maxsize[proc] )
00267   {
00268     send_maxsize[proc] = 2 * (send_size[proc] + diff);
00269     if( (send_buffer[proc] = (char*) realloc( (void*) send_buffer[proc], send_maxsize[proc] ))
00270         == 0 )
00271     {
00272       std::cerr<<"Not enough Memory in AllToAllBuffer"<<std::endl; 
00273       exit( 1 );
00274     }
00275   }
00276   send_size[proc] += diff;
00277 
00278   for( size_t i = 0 ; i < count ; i++ ) // WORKAROUND Begin
00279   {
00280     *(reinterpret_cast<A *> (send_buffer[proc] + send_position[proc])) = *element;
00281     element++;
00282     send_position[proc]+=sizeof(int);
00283   } // WORKAROUND End
00284   //MPI_Pack( const_cast<A*>(element), count, mpi_traits<A>::mpi_type,
00285   //          send_buffer[proc], send_maxsize[proc], &send_position[proc],
00286   //          _libbase.comm() );
00287 }
00288 
00289 template< class A >
00290 proc_t
00291 AllToAllBuffer::get( A * element, size_t count )
00292 {
00293   for(; last_recv_proc < _libbase.size() - 1 && 
00294         recv_buffer_parts[last_recv_proc+1]<=cur_recv_position ; last_recv_proc++ );
00295   for( size_t i = 0 ; i < count ; i++ ) // WORKAROUND Begin
00296   {
00297     *element = *(reinterpret_cast<A *> (recv_buffer + cur_recv_position));
00298     element++;
00299     cur_recv_position+=sizeof(int);
00300   } // WORKAROUND End
00301 
00302 //  MPI_Unpack( recv_buffer, whole_recv_size, &cur_recv_position,
00303 //            element, count, mpi_traits<A>::mpi_type,
00304 //            _libbase.comm() );
00305   return last_recv_proc;
00306 }
00307 
00308 template<>
00309 inline proc_t
00310 AllToAllBuffer::get( bool * element, size_t count )
00311 {
00312 // TODO: greater count than 1
00313   int value;
00314   proc_t p = get( & value, 1 );
00315   *element = (value==0) ? false : true;
00316   return p;
00317 }
00318 } // namespace ParGraph
00319 
00320 #endif // _PAR_GRAPH_HPP_

Generated on Sun Feb 29 05:14:24 2004 for ParGraph by doxygen1.3-rc3