subroutine node_coordinator_impl(ctx)
!! Internal implementation of node_coordinator with typed context
use mqc_many_body_expansion, only: many_body_expansion_t
class(many_body_expansion_t), intent(in) :: ctx
integer :: group_leader_rank, group_id
integer(int64) :: fragment_idx
integer(int32) :: fragment_size, fragment_type, dummy_msg
integer(int32) :: finished_workers
integer(int32), allocatable :: fragment_indices(:)
type(MPI_Status) :: status, global_status
logical :: local_message_pending, more_fragments, has_result
integer(int32) :: local_dummy
! For tracking worker-fragment mapping and collecting results
integer(int64) :: worker_fragment_map(ctx%resources%mpi_comms%node_comm%size())
integer(int32) :: worker_source
type(calculation_result_t) :: worker_result
! MPI request handles for non-blocking operations
type(request_t) :: req
call get_group_leader_rank(ctx, ctx%resources%mpi_comms%world_comm%rank(), group_leader_rank, group_id)
if (group_leader_rank == ctx%resources%mpi_comms%world_comm%rank()) then
call group_global_coordinator_impl(ctx)
return
end if
finished_workers = 0
more_fragments = .true.
dummy_msg = 0
worker_fragment_map = 0
do while (finished_workers < ctx%resources%mpi_comms%node_comm%size() - 1)
! PRIORITY 1: Check for incoming results from local workers
call iprobe(ctx%resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_SCALAR_RESULT, has_result, status)
if (has_result) then
worker_source = status%MPI_SOURCE
! Safety check: worker should have a fragment assigned
if (worker_fragment_map(worker_source) == 0) then
call logger%error("Node coordinator received result from worker "//to_char(worker_source)// &
" but no fragment was assigned!")
call abort_comm(ctx%resources%mpi_comms%world_comm, 1)
end if
! Receive result from worker
call result_irecv(worker_result, ctx%resources%mpi_comms%node_comm, worker_source, TAG_WORKER_SCALAR_RESULT, req)
call wait(req)
! Check for calculation errors before forwarding
if (worker_result%has_error) then
call logger%error("Fragment "//to_char(worker_fragment_map(worker_source))// &
" calculation failed on worker "//to_char(worker_source)//": "// &
worker_result%error%get_message())
call abort_comm(ctx%resources%mpi_comms%world_comm, 1)
end if
! Forward results to global coordinator with fragment index
call isend(ctx%resources%mpi_comms%world_comm, worker_fragment_map(worker_source), &
group_leader_rank, TAG_NODE_SCALAR_RESULT, req) ! fragment_idx
call wait(req)
call result_isend(worker_result, ctx%resources%mpi_comms%world_comm, group_leader_rank, TAG_NODE_SCALAR_RESULT, req) ! result
call wait(req)
! Clear the mapping
worker_fragment_map(worker_source) = 0
end if
! PRIORITY 2: Check for work requests from local workers
call iprobe(ctx%resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_REQUEST, local_message_pending, status)
if (local_message_pending) then
! Only process work request if this worker doesn't have pending results
if (worker_fragment_map(status%MPI_SOURCE) == 0) then
call irecv(ctx%resources%mpi_comms%node_comm, local_dummy, status%MPI_SOURCE, TAG_WORKER_REQUEST, req)
call wait(req)
if (more_fragments) then
call isend(ctx%resources%mpi_comms%world_comm, dummy_msg, group_leader_rank, TAG_NODE_REQUEST, req)
call wait(req)
call irecv(ctx%resources%mpi_comms%world_comm, fragment_idx, group_leader_rank, MPI_ANY_TAG, req)
call wait(req, global_status)
if (global_status%MPI_TAG == TAG_NODE_FRAGMENT) then
! Receive fragment type (0 = monomer indices, 1 = intersection atom list)
call irecv(ctx%resources%mpi_comms%world_comm, fragment_type, group_leader_rank, TAG_NODE_FRAGMENT, req)
call wait(req)
call irecv(ctx%resources%mpi_comms%world_comm, fragment_size, group_leader_rank, TAG_NODE_FRAGMENT, req)
call wait(req)
! Note: must use blocking recv for allocatable arrays since size is unknown
allocate (fragment_indices(fragment_size))
call recv(ctx%resources%mpi_comms%world_comm, fragment_indices, group_leader_rank, &
TAG_NODE_FRAGMENT, global_status)
! Forward to worker
call isend(ctx%resources%mpi_comms%node_comm, fragment_idx, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
call wait(req)
call isend(ctx%resources%mpi_comms%node_comm, fragment_type, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
call wait(req)
call isend(ctx%resources%mpi_comms%node_comm, fragment_size, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
call wait(req)
call isend(ctx%resources%mpi_comms%node_comm, fragment_indices, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
call wait(req)
! Track which fragment was sent to this worker
worker_fragment_map(status%MPI_SOURCE) = fragment_idx
deallocate (fragment_indices)
else
call isend(ctx%resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req)
call wait(req)
finished_workers = finished_workers + 1
more_fragments = .false.
end if
else
call isend(ctx%resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req)
call wait(req)
finished_workers = finished_workers + 1
end if
end if
end if
end do
end subroutine node_coordinator_impl