handle_group_results Subroutine

public subroutine handle_group_results(world_comm, results, results_received, total_items, coord_timer, group_done_count, label)

Receive grouped result batches on rank 0 and update global progress counters.

Arguments

Type IntentOptional Attributes Name
type(comm_t), intent(in) :: world_comm
type(calculation_result_t), intent(inout) :: results(:)
integer(kind=int64), intent(inout) :: results_received
integer(kind=int64), intent(in) :: total_items
type(timer_type), intent(in) :: coord_timer
integer, intent(inout) :: group_done_count
character(len=*), intent(in), optional :: label

Calls

proc~~handle_group_results~~CallsGraph proc~handle_group_results handle_group_results abort_comm abort_comm proc~handle_group_results->abort_comm error error proc~handle_group_results->error get_elapsed_time get_elapsed_time proc~handle_group_results->get_elapsed_time info info proc~handle_group_results->info iprobe iprobe proc~handle_group_results->iprobe irecv irecv proc~handle_group_results->irecv proc~error_get_message error_t%error_get_message proc~handle_group_results->proc~error_get_message proc~result_irecv result_irecv proc~handle_group_results->proc~result_irecv recv recv proc~handle_group_results->recv to_char to_char proc~handle_group_results->to_char proc~result_irecv->irecv proc~result_irecv->recv

Called by

proc~~handle_group_results~~CalledByGraph proc~handle_group_results handle_group_results proc~global_coordinator_impl global_coordinator_impl proc~global_coordinator_impl->proc~handle_group_results proc~gmbe_pie_coordinator gmbe_pie_coordinator proc~gmbe_pie_coordinator->proc~handle_group_results proc~global_coordinator global_coordinator proc~global_coordinator->proc~global_coordinator_impl proc~gmbe_run_distributed gmbe_context_t%gmbe_run_distributed proc~gmbe_run_distributed->proc~gmbe_pie_coordinator interface~global_coordinator global_coordinator interface~global_coordinator->proc~global_coordinator proc~mbe_run_distributed mbe_context_t%mbe_run_distributed proc~mbe_run_distributed->interface~global_coordinator

Variables

Type Visibility Attributes Name Initial
integer(kind=int32), private :: batch_count
integer(kind=int64), private, allocatable :: batch_ids(:)
integer, private :: dummy_msg
logical, private :: has_pending
integer, private :: i
character(len=32), private :: item_label
type(request_t), private :: req
type(MPI_Status), private :: status

Source Code

 subroutine handle_group_results(world_comm, results, results_received, total_items, coord_timer, group_done_count, label)
      !! Receive grouped result batches on rank 0 and update global progress counters.
      type(comm_t), intent(in) :: world_comm
      type(calculation_result_t), intent(inout) :: results(:)
      integer(int64), intent(inout) :: results_received
      integer(int64), intent(in) :: total_items
      type(timer_type), intent(in) :: coord_timer
      integer, intent(inout) :: group_done_count
      character(len=*), intent(in), optional :: label

      integer(int32) :: batch_count
      integer(int64), allocatable :: batch_ids(:)
      type(MPI_Status) :: status
      logical :: has_pending
      type(request_t) :: req
      integer :: i, dummy_msg
      character(len=32) :: item_label

      if (present(label)) then
         item_label = label
      else
         item_label = "item"
      end if

      do
         call iprobe(world_comm, MPI_ANY_SOURCE, TAG_GROUP_RESULT, has_pending, status)
         if (.not. has_pending) exit

         call irecv(world_comm, batch_count, status%MPI_SOURCE, TAG_GROUP_RESULT, req)
         call wait(req)
         if (batch_count <= 0) cycle

         allocate (batch_ids(batch_count))
         call recv(world_comm, batch_ids, status%MPI_SOURCE, TAG_GROUP_RESULT, status)
         do i = 1, batch_count
            call result_irecv(results(batch_ids(i)), world_comm, status%MPI_SOURCE, TAG_GROUP_RESULT, req)
            call wait(req)

            if (results(batch_ids(i))%has_error) then
               call logger%error(trim(item_label)//" "//to_char(batch_ids(i))//" calculation failed: "// &
                                 results(batch_ids(i))%error%get_message())
               call abort_comm(world_comm, 1)
            end if

            results_received = results_received + 1
            if (mod(results_received, max(1_int64, total_items/10_int64)) == 0 .or. &
                results_received == total_items) then
               call logger%info("  Processed "//to_char(results_received)//"/"// &
                                to_char(total_items)//" "//trim(item_label)//"s ["// &
                                to_char(coord_timer%get_elapsed_time())//" s]")
            end if
         end do
         deallocate (batch_ids)
      end do

      do
         call iprobe(world_comm, MPI_ANY_SOURCE, TAG_GROUP_DONE, has_pending, status)
         if (.not. has_pending) exit
         call irecv(world_comm, dummy_msg, status%MPI_SOURCE, TAG_GROUP_DONE, req)
         call wait(req)
         group_done_count = group_done_count + 1
      end do
   end subroutine handle_group_results