Global coordinator for distributing fragments to node coordinators will act as a node coordinator for a single node calculation Uses int64 for total_fragments to handle large fragment counts that overflow int32. Bond connectivity is accessed via sys_geom%bonds
| Type | Intent | Optional | Attributes | Name | ||
|---|---|---|---|---|---|---|
| type(resources_t), | intent(in) | :: | resources | |||
| integer(kind=int64), | intent(in) | :: | total_fragments | |||
| integer, | intent(in) | :: | polymers(:,:) | |||
| integer, | intent(in) | :: | max_level | |||
| integer, | intent(in) | :: | node_leader_ranks(:) | |||
| integer, | intent(in) | :: | num_nodes | |||
| type(system_geometry_t), | intent(in), | optional | :: | sys_geom | ||
| type(method_config_t), | intent(in) | :: | method_config |
Method configuration |
||
| integer(kind=int32), | intent(in) | :: | calc_type | |||
| type(json_output_data_t), | intent(out), | optional | :: | json_data |
JSON output data |
| Type | Visibility | Attributes | Name | Initial | |||
|---|---|---|---|---|---|---|---|
| integer(kind=int32), | private | :: | calc_type_local | ||||
| type(timer_type), | private | :: | coord_timer | ||||
| integer(kind=int64), | private | :: | current_fragment | ||||
| integer, | private | :: | dummy_msg | ||||
| integer, | private | :: | finished_nodes | ||||
| integer(kind=int64), | private | :: | fragment_idx | ||||
| logical, | private | :: | handling_local_workers | ||||
| logical, | private | :: | has_pending | ||||
| integer, | private | :: | local_dummy | ||||
| integer, | private | :: | local_finished_workers | ||||
| type(MPI_Status), | private | :: | local_status | ||||
| type(request_t), | private | :: | req | ||||
| integer, | private | :: | request_source | ||||
| type(calculation_result_t), | private, | allocatable | :: | results(:) | |||
| integer(kind=int64), | private | :: | results_received | ||||
| type(MPI_Status), | private | :: | status | ||||
| integer(kind=int64), | private | :: | worker_fragment_map(resources%mpi_comms%node_comm%size()) | ||||
| integer, | private | :: | worker_source |
module subroutine global_coordinator(resources, total_fragments, polymers, max_level, & node_leader_ranks, num_nodes, sys_geom, method_config, calc_type, json_data) !! Global coordinator for distributing fragments to node coordinators !! will act as a node coordinator for a single node calculation !! Uses int64 for total_fragments to handle large fragment counts that overflow int32. !! Bond connectivity is accessed via sys_geom%bonds use mqc_json_output_types, only: json_output_data_t type(resources_t), intent(in) :: resources integer(int64), intent(in) :: total_fragments integer, intent(in) :: max_level, num_nodes integer, intent(in) :: polymers(:, :), node_leader_ranks(:) type(system_geometry_t), intent(in), optional :: sys_geom type(method_config_t), intent(in) :: method_config !! Method configuration integer(int32), intent(in) :: calc_type type(json_output_data_t), intent(out), optional :: json_data !! JSON output data type(timer_type) :: coord_timer integer(int64) :: current_fragment, results_received integer :: finished_nodes integer :: request_source, dummy_msg integer(int64) :: fragment_idx type(MPI_Status) :: status, local_status logical :: handling_local_workers logical :: has_pending integer(int32) :: calc_type_local ! For local workers integer :: local_finished_workers, local_dummy ! Storage for results type(calculation_result_t), allocatable :: results(:) integer(int64) :: worker_fragment_map(resources%mpi_comms%node_comm%size()) integer :: worker_source ! MPI request handles for non-blocking operations type(request_t) :: req calc_type_local = calc_type current_fragment = total_fragments finished_nodes = 0 local_finished_workers = 0 handling_local_workers = (resources%mpi_comms%node_comm%size() > 1) results_received = 0_int64 ! Allocate storage for results allocate (results(total_fragments)) worker_fragment_map = 0 call logger%verbose("Global coordinator starting with "//to_char(total_fragments)// & " fragments for "//to_char(num_nodes)//" nodes") call coord_timer%start() do while (finished_nodes < num_nodes) ! PRIORITY 1: Check for incoming results from local workers ! This MUST be checked before sending new work to avoid race conditions if (handling_local_workers) then ! Keep checking for results until there are none pending do call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_SCALAR_RESULT, has_pending, local_status) if (.not. has_pending) exit worker_source = local_status%MPI_SOURCE ! Safety check: worker should have a fragment assigned if (worker_fragment_map(worker_source) == 0) then call logger%error("Received result from worker "//to_char(worker_source)// & " but no fragment was assigned!") call abort_comm(resources%mpi_comms%world_comm, 1) end if ! Receive result and store it using the fragment index for this worker call result_irecv(results(worker_fragment_map(worker_source)), resources%mpi_comms%node_comm, worker_source, & TAG_WORKER_SCALAR_RESULT, req) call wait(req) ! Check for calculation errors from worker if (results(worker_fragment_map(worker_source))%has_error) then call logger%error("Fragment "//to_char(worker_fragment_map(worker_source))// & " calculation failed: "// & results(worker_fragment_map(worker_source))%error%get_message()) call abort_comm(resources%mpi_comms%world_comm, 1) end if ! Clear the mapping since we've received the result worker_fragment_map(worker_source) = 0 results_received = results_received + 1 if (mod(results_received, max(1_int64, total_fragments/10)) == 0 .or. & results_received == total_fragments) then call logger%info(" Processed "//to_char(results_received)//"/"// & to_char(total_fragments)//" fragments ["// & to_char(coord_timer%get_elapsed_time())//" s]") end if end do end if ! PRIORITY 1b: Check for incoming results from remote node coordinators do call iprobe(resources%mpi_comms%world_comm, MPI_ANY_SOURCE, TAG_NODE_SCALAR_RESULT, has_pending, status) if (.not. has_pending) exit ! Receive fragment index and result from node coordinator ! TODO: serialize the data for better performance call irecv(resources%mpi_comms%world_comm, fragment_idx, status%MPI_SOURCE, TAG_NODE_SCALAR_RESULT, req) call wait(req) call result_irecv(results(fragment_idx), resources%mpi_comms%world_comm, status%MPI_SOURCE, TAG_NODE_SCALAR_RESULT, req) call wait(req) ! Check for calculation errors from node coordinator if (results(fragment_idx)%has_error) then call logger%error("Fragment "//to_char(fragment_idx)//" calculation failed: "// & results(fragment_idx)%error%get_message()) call abort_comm(resources%mpi_comms%world_comm, 1) end if results_received = results_received + 1 if (mod(results_received, max(1_int64, total_fragments/10)) == 0 .or. & results_received == total_fragments) then call logger%info(" Processed "//to_char(results_received)//"/"// & to_char(total_fragments)//" fragments ["// & to_char(coord_timer%get_elapsed_time())//" s]") end if end do ! PRIORITY 2: Remote node coordinator requests call iprobe(resources%mpi_comms%world_comm, MPI_ANY_SOURCE, TAG_NODE_REQUEST, has_pending, status) if (has_pending) then call irecv(resources%mpi_comms%world_comm, dummy_msg, status%MPI_SOURCE, TAG_NODE_REQUEST, req) call wait(req) request_source = status%MPI_SOURCE if (current_fragment >= 1) then call send_fragment_to_node(resources%mpi_comms%world_comm, current_fragment, polymers, request_source) current_fragment = current_fragment - 1 else call isend(resources%mpi_comms%world_comm, -1, request_source, TAG_NODE_FINISH, req) call wait(req) finished_nodes = finished_nodes + 1 end if end if ! PRIORITY 3: Local workers (shared memory) - send new work if (handling_local_workers .and. local_finished_workers < resources%mpi_comms%node_comm%size() - 1) then call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_REQUEST, has_pending, local_status) if (has_pending) then ! Only process work request if this worker doesn't have pending results if (worker_fragment_map(local_status%MPI_SOURCE) == 0) then call irecv(resources%mpi_comms%node_comm, local_dummy, local_status%MPI_SOURCE, TAG_WORKER_REQUEST, req) call wait(req) if (current_fragment >= 1) then call send_fragment_to_worker(resources%mpi_comms%node_comm, current_fragment, polymers, & local_status%MPI_SOURCE) ! Track which fragment was sent to this worker worker_fragment_map(local_status%MPI_SOURCE) = current_fragment current_fragment = current_fragment - 1 else call isend(resources%mpi_comms%node_comm, -1, local_status%MPI_SOURCE, TAG_WORKER_FINISH, req) call wait(req) local_finished_workers = local_finished_workers + 1 end if end if ! If worker still has pending results, skip the work request ! It will be processed on the next iteration after results are received end if end if ! Finalize local worker completion if (handling_local_workers .and. local_finished_workers >= resources%mpi_comms%node_comm%size() - 1 & .and. results_received >= total_fragments) then handling_local_workers = .false. if (num_nodes == 1) then finished_nodes = finished_nodes + 1 call logger%debug("Manually incremented finished_nodes for self") else finished_nodes = finished_nodes + 1 call logger%verbose("Global coordinator finished local workers") end if end if end do call logger%verbose("Global coordinator finished all fragments") call coord_timer%stop() call logger%info("Time to evaluate all fragments "//to_char(coord_timer%get_elapsed_time())//" s") block use mqc_result_types, only: mbe_result_t type(mbe_result_t) :: mbe_result ! Compute the many-body expansion call logger%info(" ") call logger%info("Computing Many-Body Expansion (MBE)...") call coord_timer%start() ! Allocate mbe_result components based on calc_type call mbe_result%allocate_dipole() ! Always compute dipole if (calc_type_local == CALC_TYPE_HESSIAN) then if (.not. present(sys_geom)) then call logger%error("sys_geom required for Hessian calculation in global_coordinator") call abort_comm(resources%mpi_comms%world_comm, 1) end if call mbe_result%allocate_gradient(sys_geom%total_atoms) call mbe_result%allocate_hessian(sys_geom%total_atoms) else if (calc_type_local == CALC_TYPE_GRADIENT) then if (.not. present(sys_geom)) then call logger%error("sys_geom required for gradient calculation in global_coordinator") call abort_comm(resources%mpi_comms%world_comm, 1) end if call mbe_result%allocate_gradient(sys_geom%total_atoms) end if call compute_mbe(polymers, total_fragments, max_level, results, mbe_result, & sys_geom, resources%mpi_comms%world_comm, json_data) call mbe_result%destroy() call coord_timer%stop() call logger%info("Time to compute MBE "//to_char(coord_timer%get_elapsed_time())//" s") end block ! Cleanup deallocate (results) end subroutine global_coordinator