node_coordinator Module Subroutine

module subroutine node_coordinator(resources, method_config, calc_type)

Node coordinator for distributing fragments to local workers Handles work requests and result collection from local workers

Arguments

Type IntentOptional Attributes Name
type(resources_t), intent(in) :: resources
type(method_config_t), intent(in) :: method_config

Method configuration (passed through to workers)

integer(kind=int32), intent(in) :: calc_type

Calls

proc~~node_coordinator~~CallsGraph proc~node_coordinator node_coordinator abort_comm abort_comm proc~node_coordinator->abort_comm error error proc~node_coordinator->error iprobe iprobe proc~node_coordinator->iprobe irecv irecv proc~node_coordinator->irecv isend isend proc~node_coordinator->isend proc~error_get_message error_t%error_get_message proc~node_coordinator->proc~error_get_message proc~result_irecv result_irecv proc~node_coordinator->proc~result_irecv proc~result_isend result_isend proc~node_coordinator->proc~result_isend recv recv proc~node_coordinator->recv to_char to_char proc~node_coordinator->to_char proc~result_irecv->irecv proc~result_irecv->recv proc~result_isend->isend send send proc~result_isend->send

Called by

proc~~node_coordinator~~CalledByGraph proc~node_coordinator node_coordinator interface~node_coordinator node_coordinator interface~node_coordinator->proc~node_coordinator proc~gmbe_run_distributed gmbe_context_t%gmbe_run_distributed proc~gmbe_run_distributed->interface~node_coordinator proc~mbe_run_distributed mbe_context_t%mbe_run_distributed proc~mbe_run_distributed->interface~node_coordinator

Variables

Type Visibility Attributes Name Initial
integer(kind=int32), private :: dummy_msg
integer(kind=int32), private :: finished_workers
integer(kind=int64), private :: fragment_idx
integer(kind=int32), private, allocatable :: fragment_indices(:)
integer(kind=int32), private :: fragment_size
integer(kind=int32), private :: fragment_type
type(MPI_Status), private :: global_status
logical, private :: has_result
integer(kind=int32), private :: local_dummy
logical, private :: local_message_pending
logical, private :: more_fragments
type(request_t), private :: req
type(MPI_Status), private :: status
integer(kind=int64), private :: worker_fragment_map(resources%mpi_comms%node_comm%size())
type(calculation_result_t), private :: worker_result
integer(kind=int32), private :: worker_source

Source Code

   module subroutine node_coordinator(resources, method_config, calc_type)
      !! Node coordinator for distributing fragments to local workers
      !! Handles work requests and result collection from local workers
      type(resources_t), intent(in) :: resources
      type(method_config_t), intent(in) :: method_config  !! Method configuration (passed through to workers)
      integer(int32), intent(in) :: calc_type

      integer(int64) :: fragment_idx
      integer(int32) :: fragment_size, fragment_type, dummy_msg
      integer(int32) :: finished_workers
      integer(int32), allocatable :: fragment_indices(:)
      type(MPI_Status) :: status, global_status
      logical :: local_message_pending, more_fragments, has_result
      integer(int32) :: local_dummy

      ! For tracking worker-fragment mapping and collecting results
      integer(int64) :: worker_fragment_map(resources%mpi_comms%node_comm%size())
      integer(int32) :: worker_source
      type(calculation_result_t) :: worker_result

      ! MPI request handles for non-blocking operations
      type(request_t) :: req

      finished_workers = 0
      more_fragments = .true.
      dummy_msg = 0
      worker_fragment_map = 0

      do while (finished_workers < resources%mpi_comms%node_comm%size() - 1)

         ! PRIORITY 1: Check for incoming results from local workers
         call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_SCALAR_RESULT, has_result, status)
         if (has_result) then
            worker_source = status%MPI_SOURCE

            ! Safety check: worker should have a fragment assigned
            if (worker_fragment_map(worker_source) == 0) then
               call logger%error("Node coordinator received result from worker "//to_char(worker_source)// &
                                 " but no fragment was assigned!")
               call abort_comm(resources%mpi_comms%world_comm, 1)
            end if

            ! Receive result from worker
            call result_irecv(worker_result, resources%mpi_comms%node_comm, worker_source, TAG_WORKER_SCALAR_RESULT, req)
            call wait(req)

            ! Check for calculation errors before forwarding
            if (worker_result%has_error) then
               call logger%error("Fragment "//to_char(worker_fragment_map(worker_source))// &
                                 " calculation failed on worker "//to_char(worker_source)//": "// &
                                 worker_result%error%get_message())
               call abort_comm(resources%mpi_comms%world_comm, 1)
            end if

            ! Forward results to global coordinator with fragment index
call isend(resources%mpi_comms%world_comm, worker_fragment_map(worker_source), 0, TAG_NODE_SCALAR_RESULT, req)  ! fragment_idx
            call wait(req)
  call result_isend(worker_result, resources%mpi_comms%world_comm, 0, TAG_NODE_SCALAR_RESULT, req)                ! result
            call wait(req)

            ! Clear the mapping
            worker_fragment_map(worker_source) = 0
         end if

         ! PRIORITY 2: Check for work requests from local workers
         call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_REQUEST, local_message_pending, status)

         if (local_message_pending) then
            ! Only process work request if this worker doesn't have pending results
            if (worker_fragment_map(status%MPI_SOURCE) == 0) then
               call irecv(resources%mpi_comms%node_comm, local_dummy, status%MPI_SOURCE, TAG_WORKER_REQUEST, req)
               call wait(req)

               if (more_fragments) then
                  call isend(resources%mpi_comms%world_comm, dummy_msg, 0, TAG_NODE_REQUEST, req)
                  call wait(req)
                  call irecv(resources%mpi_comms%world_comm, fragment_idx, 0, MPI_ANY_TAG, req)
                  call wait(req, global_status)

                  if (global_status%MPI_TAG == TAG_NODE_FRAGMENT) then
                     ! Receive fragment type (0 = monomer indices, 1 = intersection atom list)
                     call irecv(resources%mpi_comms%world_comm, fragment_type, 0, TAG_NODE_FRAGMENT, req)
                     call wait(req)
                     call irecv(resources%mpi_comms%world_comm, fragment_size, 0, TAG_NODE_FRAGMENT, req)
                     call wait(req)
                     ! Note: must use blocking recv for allocatable arrays since size is unknown
                     allocate (fragment_indices(fragment_size))
                     call recv(resources%mpi_comms%world_comm, fragment_indices, 0, TAG_NODE_FRAGMENT, global_status)

                     ! Forward to worker
                     call isend(resources%mpi_comms%node_comm, fragment_idx, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
                     call isend(resources%mpi_comms%node_comm, fragment_type, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
                     call isend(resources%mpi_comms%node_comm, fragment_size, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
                  call isend(resources%mpi_comms%node_comm, fragment_indices, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)

                     ! Track which fragment was sent to this worker
                     worker_fragment_map(status%MPI_SOURCE) = fragment_idx

                     deallocate (fragment_indices)
                  else
                     call isend(resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req)
                     call wait(req)
                     finished_workers = finished_workers + 1
                     more_fragments = .false.
                  end if
               else
                  call isend(resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req)
                  call wait(req)
                  finished_workers = finished_workers + 1
               end if
            end if
         end if
      end do
   end subroutine node_coordinator