subroutine handle_local_worker_results_to_batch(node_comm, world_comm, worker_map, batch_count, batch_ids, batch_results, &
results_received)
!! Drain pending local worker results and append them to the outbound batch.
type(comm_t), intent(in) :: node_comm
type(comm_t), intent(in) :: world_comm
integer(int64), intent(inout) :: worker_map(:)
integer(int32), intent(inout) :: batch_count
integer(int64), intent(inout) :: batch_ids(:)
type(calculation_result_t), intent(inout) :: batch_results(:)
integer(int64), intent(inout), optional :: results_received
type(MPI_Status) :: local_status
logical :: has_pending
integer :: worker_source
type(request_t) :: req
type(calculation_result_t) :: worker_result
integer(int64) :: item_idx
if (node_comm%size() <= 1) return
do
call iprobe(node_comm, MPI_ANY_SOURCE, TAG_WORKER_SCALAR_RESULT, has_pending, local_status)
if (.not. has_pending) exit
worker_source = local_status%MPI_SOURCE
if (worker_map(worker_source) == 0) then
call logger%error("Received result from worker "//to_char(worker_source)// &
" but no item was assigned!")
call abort_comm(world_comm, 1)
end if
call result_irecv(worker_result, node_comm, worker_source, TAG_WORKER_SCALAR_RESULT, req)
call wait(req)
if (worker_result%has_error) then
call logger%error("Item "//to_char(worker_map(worker_source))// &
" calculation failed: "// &
worker_result%error%get_message())
call abort_comm(world_comm, 1)
end if
item_idx = worker_map(worker_source)
worker_map(worker_source) = 0
if (batch_count >= size(batch_ids)) then
call flush_group_results(world_comm, batch_count, batch_ids, batch_results)
end if
call append_result_to_batch(item_idx, worker_result, batch_count, batch_ids, batch_results)
if (present(results_received)) results_received = results_received + 1_int64
if (batch_count >= size(batch_ids)) then
call flush_group_results(world_comm, batch_count, batch_ids, batch_results)
end if
call worker_result%destroy()
end do
end subroutine handle_local_worker_results_to_batch