Node coordinator for distributing fragments to local workers Handles work requests and result collection from local workers
| Type | Intent | Optional | Attributes | Name | ||
|---|---|---|---|---|---|---|
| type(resources_t), | intent(in) | :: | resources | |||
| type(method_config_t), | intent(in) | :: | method_config |
Method configuration (passed through to workers) |
||
| integer(kind=int32), | intent(in) | :: | calc_type |
| Type | Visibility | Attributes | Name | Initial | |||
|---|---|---|---|---|---|---|---|
| integer(kind=int32), | private | :: | dummy_msg | ||||
| integer(kind=int32), | private | :: | finished_workers | ||||
| integer(kind=int64), | private | :: | fragment_idx | ||||
| integer(kind=int32), | private, | allocatable | :: | fragment_indices(:) | |||
| integer(kind=int32), | private | :: | fragment_size | ||||
| integer(kind=int32), | private | :: | fragment_type | ||||
| type(MPI_Status), | private | :: | global_status | ||||
| logical, | private | :: | has_result | ||||
| integer(kind=int32), | private | :: | local_dummy | ||||
| logical, | private | :: | local_message_pending | ||||
| logical, | private | :: | more_fragments | ||||
| type(request_t), | private | :: | req | ||||
| type(MPI_Status), | private | :: | status | ||||
| integer(kind=int64), | private | :: | worker_fragment_map(resources%mpi_comms%node_comm%size()) | ||||
| type(calculation_result_t), | private | :: | worker_result | ||||
| integer(kind=int32), | private | :: | worker_source |
module subroutine node_coordinator(resources, method_config, calc_type) !! Node coordinator for distributing fragments to local workers !! Handles work requests and result collection from local workers type(resources_t), intent(in) :: resources type(method_config_t), intent(in) :: method_config !! Method configuration (passed through to workers) integer(int32), intent(in) :: calc_type integer(int64) :: fragment_idx integer(int32) :: fragment_size, fragment_type, dummy_msg integer(int32) :: finished_workers integer(int32), allocatable :: fragment_indices(:) type(MPI_Status) :: status, global_status logical :: local_message_pending, more_fragments, has_result integer(int32) :: local_dummy ! For tracking worker-fragment mapping and collecting results integer(int64) :: worker_fragment_map(resources%mpi_comms%node_comm%size()) integer(int32) :: worker_source type(calculation_result_t) :: worker_result ! MPI request handles for non-blocking operations type(request_t) :: req finished_workers = 0 more_fragments = .true. dummy_msg = 0 worker_fragment_map = 0 do while (finished_workers < resources%mpi_comms%node_comm%size() - 1) ! PRIORITY 1: Check for incoming results from local workers call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_SCALAR_RESULT, has_result, status) if (has_result) then worker_source = status%MPI_SOURCE ! Safety check: worker should have a fragment assigned if (worker_fragment_map(worker_source) == 0) then call logger%error("Node coordinator received result from worker "//to_char(worker_source)// & " but no fragment was assigned!") call abort_comm(resources%mpi_comms%world_comm, 1) end if ! Receive result from worker call result_irecv(worker_result, resources%mpi_comms%node_comm, worker_source, TAG_WORKER_SCALAR_RESULT, req) call wait(req) ! Check for calculation errors before forwarding if (worker_result%has_error) then call logger%error("Fragment "//to_char(worker_fragment_map(worker_source))// & " calculation failed on worker "//to_char(worker_source)//": "// & worker_result%error%get_message()) call abort_comm(resources%mpi_comms%world_comm, 1) end if ! Forward results to global coordinator with fragment index call isend(resources%mpi_comms%world_comm, worker_fragment_map(worker_source), 0, TAG_NODE_SCALAR_RESULT, req) ! fragment_idx call wait(req) call result_isend(worker_result, resources%mpi_comms%world_comm, 0, TAG_NODE_SCALAR_RESULT, req) ! result call wait(req) ! Clear the mapping worker_fragment_map(worker_source) = 0 end if ! PRIORITY 2: Check for work requests from local workers call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_REQUEST, local_message_pending, status) if (local_message_pending) then ! Only process work request if this worker doesn't have pending results if (worker_fragment_map(status%MPI_SOURCE) == 0) then call irecv(resources%mpi_comms%node_comm, local_dummy, status%MPI_SOURCE, TAG_WORKER_REQUEST, req) call wait(req) if (more_fragments) then call isend(resources%mpi_comms%world_comm, dummy_msg, 0, TAG_NODE_REQUEST, req) call wait(req) call irecv(resources%mpi_comms%world_comm, fragment_idx, 0, MPI_ANY_TAG, req) call wait(req, global_status) if (global_status%MPI_TAG == TAG_NODE_FRAGMENT) then ! Receive fragment type (0 = monomer indices, 1 = intersection atom list) call irecv(resources%mpi_comms%world_comm, fragment_type, 0, TAG_NODE_FRAGMENT, req) call wait(req) call irecv(resources%mpi_comms%world_comm, fragment_size, 0, TAG_NODE_FRAGMENT, req) call wait(req) ! Note: must use blocking recv for allocatable arrays since size is unknown allocate (fragment_indices(fragment_size)) call recv(resources%mpi_comms%world_comm, fragment_indices, 0, TAG_NODE_FRAGMENT, global_status) ! Forward to worker call isend(resources%mpi_comms%node_comm, fragment_idx, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req) call wait(req) call isend(resources%mpi_comms%node_comm, fragment_type, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req) call wait(req) call isend(resources%mpi_comms%node_comm, fragment_size, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req) call wait(req) call isend(resources%mpi_comms%node_comm, fragment_indices, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req) call wait(req) ! Track which fragment was sent to this worker worker_fragment_map(status%MPI_SOURCE) = fragment_idx deallocate (fragment_indices) else call isend(resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req) call wait(req) finished_workers = finished_workers + 1 more_fragments = .false. end if else call isend(resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req) call wait(req) finished_workers = finished_workers + 1 end if end if end if end do end subroutine node_coordinator