subroutine handle_node_results_to_batch(world_comm, batch_count, batch_ids, batch_results, results_received)
!! Drain pending node-level results and append them to the outbound batch.
type(comm_t), intent(in) :: world_comm
integer(int32), intent(inout) :: batch_count
integer(int64), intent(inout) :: batch_ids(:)
type(calculation_result_t), intent(inout) :: batch_results(:)
integer(int64), intent(inout), optional :: results_received
integer(int64) :: item_idx
type(MPI_Status) :: status
logical :: has_pending
type(request_t) :: req
type(calculation_result_t) :: node_result
do
call iprobe(world_comm, MPI_ANY_SOURCE, TAG_NODE_SCALAR_RESULT, has_pending, status)
if (.not. has_pending) exit
call irecv(world_comm, item_idx, status%MPI_SOURCE, TAG_NODE_SCALAR_RESULT, req)
call wait(req)
call result_irecv(node_result, world_comm, status%MPI_SOURCE, TAG_NODE_SCALAR_RESULT, req)
call wait(req)
if (node_result%has_error) then
call logger%error("Item "//to_char(item_idx)//" calculation failed: "// &
node_result%error%get_message())
call abort_comm(world_comm, 1)
end if
if (batch_count >= size(batch_ids)) then
call flush_group_results(world_comm, batch_count, batch_ids, batch_results)
end if
call append_result_to_batch(item_idx, node_result, batch_count, batch_ids, batch_results)
if (present(results_received)) results_received = results_received + 1_int64
if (batch_count >= size(batch_ids)) then
call flush_group_results(world_comm, batch_count, batch_ids, batch_results)
end if
call node_result%destroy()
end do
end subroutine handle_node_results_to_batch