node_coordinator_impl Subroutine

subroutine node_coordinator_impl(ctx)

Uses

  • proc~~node_coordinator_impl~~UsesGraph proc~node_coordinator_impl node_coordinator_impl module~mqc_many_body_expansion mqc_many_body_expansion proc~node_coordinator_impl->module~mqc_many_body_expansion module~mqc_config_adapter mqc_config_adapter module~mqc_many_body_expansion->module~mqc_config_adapter module~mqc_json_output_types mqc_json_output_types module~mqc_many_body_expansion->module~mqc_json_output_types module~mqc_method_config mqc_method_config module~mqc_many_body_expansion->module~mqc_method_config module~mqc_physical_fragment mqc_physical_fragment module~mqc_many_body_expansion->module~mqc_physical_fragment module~mqc_resources mqc_resources module~mqc_many_body_expansion->module~mqc_resources pic_types pic_types module~mqc_many_body_expansion->pic_types module~mqc_config_adapter->module~mqc_method_config module~mqc_config_adapter->module~mqc_physical_fragment module~mqc_config_adapter->pic_types module~mqc_calculation_keywords mqc_calculation_keywords module~mqc_config_adapter->module~mqc_calculation_keywords module~mqc_config_parser mqc_config_parser module~mqc_config_adapter->module~mqc_config_parser module~mqc_elements mqc_elements module~mqc_config_adapter->module~mqc_elements module~mqc_error mqc_error module~mqc_config_adapter->module~mqc_error pic_logger pic_logger module~mqc_config_adapter->pic_logger module~mqc_json_output_types->pic_types module~mqc_thermochemistry mqc_thermochemistry module~mqc_json_output_types->module~mqc_thermochemistry module~mqc_method_config->pic_types module~mqc_method_types mqc_method_types module~mqc_method_config->module~mqc_method_types module~mqc_physical_fragment->pic_types module~mqc_cgto mqc_cgto module~mqc_physical_fragment->module~mqc_cgto module~mqc_physical_fragment->module~mqc_elements module~mqc_physical_fragment->module~mqc_error module~mqc_geometry mqc_geometry module~mqc_physical_fragment->module~mqc_geometry module~mqc_physical_constants mqc_physical_constants module~mqc_physical_fragment->module~mqc_physical_constants module~mqc_xyz_reader mqc_xyz_reader module~mqc_physical_fragment->module~mqc_xyz_reader module~mqc_mpi_comms mqc_mpi_comms module~mqc_resources->module~mqc_mpi_comms module~mqc_calculation_keywords->pic_types module~mqc_calculation_defaults mqc_calculation_defaults module~mqc_calculation_keywords->module~mqc_calculation_defaults module~mqc_cgto->pic_types module~mqc_config_parser->module~mqc_physical_fragment module~mqc_config_parser->pic_types module~mqc_config_parser->module~mqc_error module~mqc_config_parser->module~mqc_geometry module~mqc_config_parser->module~mqc_method_types module~mqc_calc_types mqc_calc_types module~mqc_config_parser->module~mqc_calc_types module~mqc_config_parser->module~mqc_calculation_defaults module~mqc_elements->pic_types pic_ascii pic_ascii module~mqc_elements->pic_ascii module~mqc_geometry->pic_types module~mqc_method_types->pic_types pic_mpi_lib pic_mpi_lib module~mqc_mpi_comms->pic_mpi_lib module~mqc_physical_constants->pic_types module~mqc_thermochemistry->pic_types module~mqc_thermochemistry->module~mqc_elements module~mqc_thermochemistry->module~mqc_physical_constants module~mqc_thermochemistry->pic_logger pic_io pic_io module~mqc_thermochemistry->pic_io pic_lapack_interfaces pic_lapack_interfaces module~mqc_thermochemistry->pic_lapack_interfaces module~mqc_xyz_reader->pic_types module~mqc_xyz_reader->module~mqc_error module~mqc_xyz_reader->module~mqc_geometry module~mqc_calc_types->pic_types module~mqc_calculation_defaults->pic_types

Internal implementation of node_coordinator with typed context

Arguments

Type IntentOptional Attributes Name
class(many_body_expansion_t), intent(in) :: ctx

Calls

proc~~node_coordinator_impl~~CallsGraph proc~node_coordinator_impl node_coordinator_impl abort_comm abort_comm proc~node_coordinator_impl->abort_comm error error proc~node_coordinator_impl->error iprobe iprobe proc~node_coordinator_impl->iprobe irecv irecv proc~node_coordinator_impl->irecv isend isend proc~node_coordinator_impl->isend proc~error_get_message error_t%error_get_message proc~node_coordinator_impl->proc~error_get_message proc~get_group_leader_rank get_group_leader_rank proc~node_coordinator_impl->proc~get_group_leader_rank proc~group_global_coordinator_impl group_global_coordinator_impl proc~node_coordinator_impl->proc~group_global_coordinator_impl proc~result_irecv result_irecv proc~node_coordinator_impl->proc~result_irecv proc~result_isend result_isend proc~node_coordinator_impl->proc~result_isend recv recv proc~node_coordinator_impl->recv to_char to_char proc~node_coordinator_impl->to_char proc~group_global_coordinator_impl->abort_comm proc~group_global_coordinator_impl->error proc~group_global_coordinator_impl->isend proc~group_global_coordinator_impl->proc~get_group_leader_rank proc~flush_group_results flush_group_results proc~group_global_coordinator_impl->proc~flush_group_results proc~handle_group_node_requests handle_group_node_requests proc~group_global_coordinator_impl->proc~handle_group_node_requests proc~handle_local_worker_requests_group handle_local_worker_requests_group proc~group_global_coordinator_impl->proc~handle_local_worker_requests_group proc~handle_local_worker_results_to_batch handle_local_worker_results_to_batch proc~group_global_coordinator_impl->proc~handle_local_worker_results_to_batch proc~handle_node_results_to_batch handle_node_results_to_batch proc~group_global_coordinator_impl->proc~handle_node_results_to_batch proc~queue_destroy queue_destroy proc~group_global_coordinator_impl->proc~queue_destroy proc~queue_init_from_list queue_init_from_list proc~group_global_coordinator_impl->proc~queue_init_from_list proc~queue_is_empty queue_is_empty proc~group_global_coordinator_impl->proc~queue_is_empty proc~receive_group_assignment_matrix receive_group_assignment_matrix proc~group_global_coordinator_impl->proc~receive_group_assignment_matrix proc~result_irecv->irecv proc~result_irecv->recv proc~result_isend->isend send send proc~result_isend->send proc~flush_group_results->isend proc~flush_group_results->proc~result_isend proc~result_destroy calculation_result_t%result_destroy proc~flush_group_results->proc~result_destroy proc~handle_group_node_requests->iprobe proc~handle_group_node_requests->irecv proc~handle_group_node_requests->isend proc~queue_pop queue_pop proc~handle_group_node_requests->proc~queue_pop proc~send_fragment_payload_from_row send_fragment_payload_from_row proc~handle_group_node_requests->proc~send_fragment_payload_from_row proc~handle_local_worker_requests_group->iprobe proc~handle_local_worker_requests_group->irecv proc~handle_local_worker_requests_group->isend proc~handle_local_worker_requests_group->proc~queue_pop proc~handle_local_worker_requests_group->proc~send_fragment_payload_from_row proc~handle_local_worker_results_to_batch->abort_comm proc~handle_local_worker_results_to_batch->error proc~handle_local_worker_results_to_batch->iprobe proc~handle_local_worker_results_to_batch->proc~error_get_message proc~handle_local_worker_results_to_batch->proc~result_irecv proc~handle_local_worker_results_to_batch->to_char proc~handle_local_worker_results_to_batch->proc~flush_group_results proc~append_result_to_batch append_result_to_batch proc~handle_local_worker_results_to_batch->proc~append_result_to_batch proc~handle_local_worker_results_to_batch->proc~result_destroy proc~handle_node_results_to_batch->abort_comm proc~handle_node_results_to_batch->error proc~handle_node_results_to_batch->iprobe proc~handle_node_results_to_batch->irecv proc~handle_node_results_to_batch->proc~error_get_message proc~handle_node_results_to_batch->proc~result_irecv proc~handle_node_results_to_batch->to_char proc~handle_node_results_to_batch->proc~flush_group_results proc~handle_node_results_to_batch->proc~append_result_to_batch proc~handle_node_results_to_batch->proc~result_destroy proc~receive_group_assignment_matrix->irecv proc~receive_group_assignment_matrix->recv proc~result_reset calculation_result_t%result_reset proc~result_destroy->proc~result_reset proc~send_fragment_payload_from_row->isend proc~build_fragment_payload_from_row build_fragment_payload_from_row proc~send_fragment_payload_from_row->proc~build_fragment_payload_from_row proc~energy_reset energy_t%energy_reset proc~result_reset->proc~energy_reset proc~error_clear error_t%error_clear proc~result_reset->proc~error_clear

Called by

proc~~node_coordinator_impl~~CalledByGraph proc~node_coordinator_impl node_coordinator_impl proc~node_coordinator node_coordinator proc~node_coordinator->proc~node_coordinator_impl interface~node_coordinator node_coordinator interface~node_coordinator->proc~node_coordinator proc~gmbe_run_distributed gmbe_context_t%gmbe_run_distributed proc~gmbe_run_distributed->interface~node_coordinator proc~mbe_run_distributed mbe_context_t%mbe_run_distributed proc~mbe_run_distributed->interface~node_coordinator

Variables

Type Visibility Attributes Name Initial
integer(kind=int32), private :: dummy_msg
integer(kind=int32), private :: finished_workers
integer(kind=int64), private :: fragment_idx
integer(kind=int32), private, allocatable :: fragment_indices(:)
integer(kind=int32), private :: fragment_size
integer(kind=int32), private :: fragment_type
type(MPI_Status), private :: global_status
integer, private :: group_id
integer, private :: group_leader_rank
logical, private :: has_result
integer(kind=int32), private :: local_dummy
logical, private :: local_message_pending
logical, private :: more_fragments
type(request_t), private :: req
type(MPI_Status), private :: status
integer(kind=int64), private :: worker_fragment_map(ctx%resources%mpi_comms%node_comm%size())
type(calculation_result_t), private :: worker_result
integer(kind=int32), private :: worker_source

Source Code

   subroutine node_coordinator_impl(ctx)
      !! Internal implementation of node_coordinator with typed context
      use mqc_many_body_expansion, only: many_body_expansion_t
      class(many_body_expansion_t), intent(in) :: ctx

      integer :: group_leader_rank, group_id
      integer(int64) :: fragment_idx
      integer(int32) :: fragment_size, fragment_type, dummy_msg
      integer(int32) :: finished_workers
      integer(int32), allocatable :: fragment_indices(:)
      type(MPI_Status) :: status, global_status
      logical :: local_message_pending, more_fragments, has_result
      integer(int32) :: local_dummy

      ! For tracking worker-fragment mapping and collecting results
      integer(int64) :: worker_fragment_map(ctx%resources%mpi_comms%node_comm%size())
      integer(int32) :: worker_source
      type(calculation_result_t) :: worker_result

      ! MPI request handles for non-blocking operations
      type(request_t) :: req

      call get_group_leader_rank(ctx, ctx%resources%mpi_comms%world_comm%rank(), group_leader_rank, group_id)
      if (group_leader_rank == ctx%resources%mpi_comms%world_comm%rank()) then
         call group_global_coordinator_impl(ctx)
         return
      end if

      finished_workers = 0
      more_fragments = .true.
      dummy_msg = 0
      worker_fragment_map = 0

      do while (finished_workers < ctx%resources%mpi_comms%node_comm%size() - 1)

         ! PRIORITY 1: Check for incoming results from local workers
         call iprobe(ctx%resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_SCALAR_RESULT, has_result, status)
         if (has_result) then
            worker_source = status%MPI_SOURCE

            ! Safety check: worker should have a fragment assigned
            if (worker_fragment_map(worker_source) == 0) then
               call logger%error("Node coordinator received result from worker "//to_char(worker_source)// &
                                 " but no fragment was assigned!")
               call abort_comm(ctx%resources%mpi_comms%world_comm, 1)
            end if

            ! Receive result from worker
         call result_irecv(worker_result, ctx%resources%mpi_comms%node_comm, worker_source, TAG_WORKER_SCALAR_RESULT, req)
            call wait(req)

            ! Check for calculation errors before forwarding
            if (worker_result%has_error) then
               call logger%error("Fragment "//to_char(worker_fragment_map(worker_source))// &
                                 " calculation failed on worker "//to_char(worker_source)//": "// &
                                 worker_result%error%get_message())
               call abort_comm(ctx%resources%mpi_comms%world_comm, 1)
            end if

            ! Forward results to global coordinator with fragment index
            call isend(ctx%resources%mpi_comms%world_comm, worker_fragment_map(worker_source), &
                       group_leader_rank, TAG_NODE_SCALAR_RESULT, req)  ! fragment_idx
            call wait(req)
call result_isend(worker_result, ctx%resources%mpi_comms%world_comm, group_leader_rank, TAG_NODE_SCALAR_RESULT, req)  ! result
            call wait(req)

            ! Clear the mapping
            worker_fragment_map(worker_source) = 0
         end if

         ! PRIORITY 2: Check for work requests from local workers
         call iprobe(ctx%resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_REQUEST, local_message_pending, status)

         if (local_message_pending) then
            ! Only process work request if this worker doesn't have pending results
            if (worker_fragment_map(status%MPI_SOURCE) == 0) then
               call irecv(ctx%resources%mpi_comms%node_comm, local_dummy, status%MPI_SOURCE, TAG_WORKER_REQUEST, req)
               call wait(req)

               if (more_fragments) then
                  call isend(ctx%resources%mpi_comms%world_comm, dummy_msg, group_leader_rank, TAG_NODE_REQUEST, req)
                  call wait(req)
                  call irecv(ctx%resources%mpi_comms%world_comm, fragment_idx, group_leader_rank, MPI_ANY_TAG, req)
                  call wait(req, global_status)

                  if (global_status%MPI_TAG == TAG_NODE_FRAGMENT) then
                     ! Receive fragment type (0 = monomer indices, 1 = intersection atom list)
                  call irecv(ctx%resources%mpi_comms%world_comm, fragment_type, group_leader_rank, TAG_NODE_FRAGMENT, req)
                     call wait(req)
                  call irecv(ctx%resources%mpi_comms%world_comm, fragment_size, group_leader_rank, TAG_NODE_FRAGMENT, req)
                     call wait(req)
                     ! Note: must use blocking recv for allocatable arrays since size is unknown
                     allocate (fragment_indices(fragment_size))
                     call recv(ctx%resources%mpi_comms%world_comm, fragment_indices, group_leader_rank, &
                               TAG_NODE_FRAGMENT, global_status)

                     ! Forward to worker
                  call isend(ctx%resources%mpi_comms%node_comm, fragment_idx, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
                 call isend(ctx%resources%mpi_comms%node_comm, fragment_type, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
                 call isend(ctx%resources%mpi_comms%node_comm, fragment_size, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
              call isend(ctx%resources%mpi_comms%node_comm, fragment_indices, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)

                     ! Track which fragment was sent to this worker
                     worker_fragment_map(status%MPI_SOURCE) = fragment_idx

                     deallocate (fragment_indices)
                  else
                     call isend(ctx%resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req)
                     call wait(req)
                     finished_workers = finished_workers + 1
                     more_fragments = .false.
                  end if
               else
                  call isend(ctx%resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req)
                  call wait(req)
                  finished_workers = finished_workers + 1
               end if
            end if
         end if
      end do
   end subroutine node_coordinator_impl