Skip to content

optimize memory layout #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: dev
Choose a base branch
from
6 changes: 3 additions & 3 deletions examples/6_resource_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ int main()
rg::init(1);
rg::IOResource<int> a; // scope-level=0

rg::emplace_task(
rg::emplace_continuable_task(
[](auto a)
{
std::cout << "scope = " << rg::scope_depth() << std::endl;
Expand All @@ -34,8 +34,8 @@ int main()

std::cout << "scope = " << rg::scope_depth() << std::endl;
},
a.read())
.enable_stack_switching();
a.read()
);

rg::finalize();
}
131 changes: 70 additions & 61 deletions examples/mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,61 +37,59 @@ struct MPIConfig
int main()
{
spdlog::set_pattern("[thread %t] %^[%l]%$ %v");
spdlog::set_level(spdlog::level::trace);
spdlog::set_level( spdlog::level::trace );

/*
int prov;
MPI_Init_thread( nullptr, nullptr, MPI_THREAD_MULTIPLE, &prov );
assert( prov == MPI_THREAD_MULTIPLE );
*/

MPI_Init(nullptr, nullptr);
MPI_Init( nullptr, nullptr );

auto default_scheduler = std::make_shared<rg::scheduler::DefaultScheduler>();
auto mpi_request_pool = std::make_shared<rg::dispatch::mpi::RequestPool>();

hwloc_obj_t obj = hwloc_get_obj_by_type(redGrapes::SingletonContext::get().hwloc_ctx.topology, HWLOC_OBJ_PU, 1);
rg::memory::ChunkedBumpAlloc<rg::memory::HwlocAlloc> mpi_alloc(
rg::memory::HwlocAlloc(redGrapes::SingletonContext::get().hwloc_ctx, obj));
auto mpi_worker = std::make_shared<rg::dispatch::thread::Worker>(
mpi_alloc,
redGrapes::SingletonContext::get().hwloc_ctx,
obj,
4);
hwloc_obj_t obj = hwloc_get_obj_by_type( redGrapes::SingletonContext::get().hwloc_ctx.topology, HWLOC_OBJ_PU, 1 );
rg::memory::ChunkedBumpAlloc< rg::memory::HwlocAlloc > mpi_alloc( rg::memory::HwlocAlloc( redGrapes::SingletonContext::get().hwloc_ctx, obj ) );
auto mpi_worker = std::make_shared<rg::dispatch::thread::Worker>( mpi_alloc, redGrapes::SingletonContext::get().hwloc_ctx, obj, 4 );

// initialize main thread to execute tasks from the mpi-queue and poll
rg::SingletonContext::get().idle = [mpi_worker, mpi_request_pool]
{
mpi_request_pool->poll();

redGrapes::Task* task;

if(task = mpi_worker->ready_queue.pop())
redGrapes::SingletonContext::get().execute_task(*task);

while(mpi_worker->init_dependencies(task, true))
if(task)
{
redGrapes::SingletonContext::get().execute_task(*task);
break;
}
};

rg::init(4, rg::scheduler::make_tag_match_scheduler().add({}, default_scheduler).add({SCHED_MPI}, mpi_worker));

rg::SingletonContext::get().idle =
[mpi_worker, mpi_request_pool]
{
mpi_request_pool->poll();

redGrapes::Task * task;

if( (task = mpi_worker->ready_queue.pop()) )
redGrapes::SingletonContext::get().execute_task( *task );

while( mpi_worker->init_dependencies( task, true ) )
if( task )
{
redGrapes::SingletonContext::get().execute_task( *task );
break;
}
};

rg::init(4,
rg::scheduler::make_tag_match_scheduler()
.add({}, default_scheduler)
.add({ SCHED_MPI }, mpi_worker));

// initialize MPI config
rg::IOResource<MPIConfig> mpi_config;
rg::IOResource< MPIConfig > mpi_config;
rg::emplace_task(
[](auto config)
{
[]( auto config ) {
MPI_Comm_rank(MPI_COMM_WORLD, &config->world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &config->world_size);
},
mpi_config.write())
.scheduling_tags(std::bitset<64>().set(SCHED_MPI));
mpi_config.write()
).scheduling_tags( std::bitset<64>().set(SCHED_MPI) );

// main loop
rg::FieldResource<std::array<int, 4>> field[2] = {
rg::FieldResource< std::array<int, 4> > field[2] = {
rg::FieldResource<std::array<int, 4>>(new std::array<int, 4>()),
rg::FieldResource<std::array<int, 4>>(new std::array<int, 4>()),
};
Expand All @@ -100,14 +98,15 @@ int main()

// initialize
rg::emplace_task(
[](auto buf, auto mpi_config)
[]( auto buf, auto mpi_config )
{
int offset = 3 * mpi_config->world_rank;
for(size_t i = 0; i < buf->size(); ++i)
for( size_t i = 0; i < buf->size(); ++i )
buf[{i}] = offset + i;
},
field[current].write(),
mpi_config.read());
mpi_config.read()
);

for(size_t i = 0; i < 1; ++i)
{
Expand All @@ -118,67 +117,77 @@ int main()
*/

// Send
rg::emplace_task(
[i, current, mpi_request_pool](auto field, auto mpi_config)
rg::emplace_continuable_task(
[i, current, mpi_request_pool]( auto field, auto mpi_config )
{
int dst = (mpi_config->world_rank + 1) % mpi_config->world_size;
int dst = ( mpi_config->world_rank + 1 ) % mpi_config->world_size;

MPI_Request request;
MPI_Isend(&field[{3}], sizeof(int), MPI_CHAR, dst, current, MPI_COMM_WORLD, &request);
MPI_Isend( &field[{3}], sizeof(int), MPI_CHAR, dst, current, MPI_COMM_WORLD, &request );

mpi_request_pool->get_status(request);
mpi_request_pool->get_status( request );
},
field[current].at({3}).read(),
mpi_config.read())
.scheduling_tags({SCHED_MPI})
.enable_stack_switching();
.scheduling_tags({ SCHED_MPI });

// Receive
rg::emplace_task(
[i, current, mpi_request_pool](auto field, auto mpi_config)
rg::emplace_continuable_task(
[i, current, mpi_request_pool]( auto field, auto mpi_config )
{
int src = (mpi_config->world_rank - 1) % mpi_config->world_size;
int src = ( mpi_config->world_rank - 1 ) % mpi_config->world_size;

MPI_Request request;
MPI_Irecv(&field[{0}], sizeof(int), MPI_CHAR, src, current, MPI_COMM_WORLD, &request);
MPI_Irecv( &field[{0}], sizeof(int), MPI_CHAR, src, current, MPI_COMM_WORLD, &request );

MPI_Status status = mpi_request_pool->get_status(request);
MPI_Status status = mpi_request_pool->get_status( request );

int recv_data_count;
MPI_Get_count(&status, MPI_CHAR, &recv_data_count);
MPI_Get_count( &status, MPI_CHAR, &recv_data_count );
},
field[current].at({0}).write(),
mpi_config.read())
.scheduling_tags({SCHED_MPI})
.enable_stack_switching();
.scheduling_tags({ SCHED_MPI });

/*
* Compute iteration
*/
for(size_t i = 1; i < field[current]->size(); ++i)
for( size_t i = 1; i < field[current]->size(); ++i )
rg::emplace_task(
[i](auto dst, auto src) { dst[{i}] = src[{i - 1}]; },
[i]( auto dst, auto src )
{
dst[{i}] = src[{i - 1}];
},
field[next].at({i}).write(),
field[current].at({i - 1}).read());
field[current].at({i-1}).read()
);

/*
* Write Output
*/
rg::emplace_task(
[i](auto buf, auto mpi_config)
{
[i]( auto buf, auto mpi_config )
{
std::cout << "Step[" << i << "], rank[" << mpi_config->world_rank << "] :: ";
for(size_t i = 0; i < buf->size(); ++i)
for( size_t i = 0; i < buf->size(); ++i )
std::cout << buf[{i}] << "; ";
std::cout << std::endl;
},
field[current].read(),
mpi_config.read());
mpi_config.read()
);

current = next;
}

rg::emplace_task([](auto m) { MPI_Finalize(); }, mpi_config.write()).scheduling_tags({SCHED_MPI});
rg::emplace_task(
[]( auto m )
{
MPI_Finalize();
},
mpi_config.write()
).scheduling_tags({ SCHED_MPI });

rg::finalize();
}

9 changes: 2 additions & 7 deletions redGrapes/dispatch/thread/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@
#include <redGrapes/task/task.hpp>
#include <redGrapes/util/trace.hpp>

#include <boost/mp11/detail/mp_void.hpp>
#include <spdlog/spdlog.h>

#include <optional>

namespace redGrapes
{
/*
Expand All @@ -38,8 +33,8 @@ namespace thread

if(event)
{
event->get_event().waker_id = current_worker->get_waker_id();
task.sg_pause(*event);
event.get_event().waker_id = current_waker_id;
task.sg_pause(event);

task.pre_event.up();
task.get_pre_event().notify();
Expand Down
2 changes: 1 addition & 1 deletion redGrapes/dispatch/thread/worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace redGrapes
hwloc_obj_t obj = hwloc_get_obj_by_type(hwloc_ctx.topology, HWLOC_OBJ_PU, pu_id);
allocs.emplace_back(memory::HwlocAlloc(hwloc_ctx, obj), REDGRAPES_ALLOC_CHUNKSIZE);

SingletonContext::get().current_arena = pu_id;
SingletonContext::get().current_arena = worker_id;
auto worker
= memory::alloc_shared_bind<WorkerThread>(pu_id, get_alloc(pu_id), hwloc_ctx, obj, worker_id);
// auto worker = std::make_shared< WorkerThread >( get_alloc(i), hwloc_ctx, obj, i );
Expand Down
2 changes: 1 addition & 1 deletion redGrapes/memory/chunked_bump_alloc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace redGrapes
TRACE_EVENT("Allocator", "ChunkedBumpAlloc::allocate()");
size_t alloc_size = roundup_to_poweroftwo(n);

size_t const chunk_capacity = bump_allocators.get_chunk_capacity();
size_t const chunk_capacity = chunk_size; // bump_allocators.get_chunk_capacity();

if(alloc_size <= chunk_capacity)
{
Expand Down
Loading