mqc_mbe_mpi_fragment_distribution_scheme.F90 Source File


This file depends on

sourcefile~~mqc_mbe_mpi_fragment_distribution_scheme.f90~~EfferentGraph sourcefile~mqc_mbe_mpi_fragment_distribution_scheme.f90 mqc_mbe_mpi_fragment_distribution_scheme.F90 sourcefile~mqc_error.f90 mqc_error.f90 sourcefile~mqc_mbe_mpi_fragment_distribution_scheme.f90->sourcefile~mqc_error.f90 sourcefile~mqc_json_output_types.f90 mqc_json_output_types.f90 sourcefile~mqc_mbe_mpi_fragment_distribution_scheme.f90->sourcefile~mqc_json_output_types.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90 mqc_mbe_fragment_distribution_scheme.F90 sourcefile~mqc_mbe_mpi_fragment_distribution_scheme.f90->sourcefile~mqc_mbe_fragment_distribution_scheme.f90 sourcefile~mqc_result_types.f90 mqc_result_types.f90 sourcefile~mqc_mbe_mpi_fragment_distribution_scheme.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_thermochemistry.f90 mqc_thermochemistry.f90 sourcefile~mqc_json_output_types.f90->sourcefile~mqc_thermochemistry.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_json_output_types.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_calc_types.f90 mqc_calc_types.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_calc_types.f90 sourcefile~mqc_calculation_defaults.f90 mqc_calculation_defaults.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_calculation_defaults.f90 sourcefile~mqc_config_adapter.f90 mqc_config_adapter.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_config_adapter.f90 sourcefile~mqc_mbe.f90 mqc_mbe.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_mbe.f90 sourcefile~mqc_mbe_io.f90 mqc_mbe_io.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_mbe_io.f90 sourcefile~mqc_method_base.f90 mqc_method_base.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_method_base.f90 sourcefile~mqc_method_config.f90 mqc_method_config.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_method_config.f90 sourcefile~mqc_method_factory.f90 mqc_method_factory.F90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_method_factory.f90 sourcefile~mqc_method_types.f90 mqc_method_types.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_method_types.f90 sourcefile~mqc_mpi_tags.f90 mqc_mpi_tags.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_mpi_tags.f90 sourcefile~mqc_physical_fragment.f90 mqc_physical_fragment.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_resources.f90 mqc_resources.f90 sourcefile~mqc_mbe_fragment_distribution_scheme.f90->sourcefile~mqc_resources.f90 sourcefile~mqc_result_types.f90->sourcefile~mqc_error.f90 sourcefile~mqc_config_adapter.f90->sourcefile~mqc_error.f90 sourcefile~mqc_config_adapter.f90->sourcefile~mqc_method_config.f90 sourcefile~mqc_config_adapter.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_calculation_keywords.f90 mqc_calculation_keywords.f90 sourcefile~mqc_config_adapter.f90->sourcefile~mqc_calculation_keywords.f90 sourcefile~mqc_config_parser.f90 mqc_config_parser.F90 sourcefile~mqc_config_adapter.f90->sourcefile~mqc_config_parser.f90 sourcefile~mqc_elements.f90 mqc_elements.f90 sourcefile~mqc_config_adapter.f90->sourcefile~mqc_elements.f90 sourcefile~mqc_geometry.f90 mqc_geometry.f90 sourcefile~mqc_config_adapter.f90->sourcefile~mqc_geometry.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_error.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_json_output_types.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_mbe_io.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_mpi_tags.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_thermochemistry.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_config_parser.f90 sourcefile~mqc_frag_utils.f90 mqc_frag_utils.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_frag_utils.f90 sourcefile~mqc_gmbe_utils.f90 mqc_gmbe_utils.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_gmbe_utils.f90 sourcefile~mqc_program_limits.f90 mqc_program_limits.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_program_limits.f90 sourcefile~mqc_vibrational_analysis.f90 mqc_vibrational_analysis.f90 sourcefile~mqc_mbe.f90->sourcefile~mqc_vibrational_analysis.f90 sourcefile~mqc_mbe_io.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_mbe_io.f90->sourcefile~mqc_elements.f90 sourcefile~mqc_method_base.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_method_base.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_method_config.f90->sourcefile~mqc_method_types.f90 sourcefile~mqc_method_factory.f90->sourcefile~mqc_method_base.f90 sourcefile~mqc_method_factory.f90->sourcefile~mqc_method_config.f90 sourcefile~mqc_method_factory.f90->sourcefile~mqc_method_types.f90 sourcefile~mqc_method_dft.f90 mqc_method_dft.f90 sourcefile~mqc_method_factory.f90->sourcefile~mqc_method_dft.f90 sourcefile~mqc_method_hf.f90 mqc_method_hf.f90 sourcefile~mqc_method_factory.f90->sourcefile~mqc_method_hf.f90 sourcefile~mqc_method_mcscf.f90 mqc_method_mcscf.f90 sourcefile~mqc_method_factory.f90->sourcefile~mqc_method_mcscf.f90 sourcefile~mqc_method_xtb.f90 mqc_method_xtb.f90 sourcefile~mqc_method_factory.f90->sourcefile~mqc_method_xtb.f90 sourcefile~mqc_physical_fragment.f90->sourcefile~mqc_error.f90 sourcefile~mqc_cgto.f90 mqc_cgto.f90 sourcefile~mqc_physical_fragment.f90->sourcefile~mqc_cgto.f90 sourcefile~mqc_physical_fragment.f90->sourcefile~mqc_elements.f90 sourcefile~mqc_physical_fragment.f90->sourcefile~mqc_geometry.f90 sourcefile~mqc_physical_constants.f90 mqc_physical_constants.f90 sourcefile~mqc_physical_fragment.f90->sourcefile~mqc_physical_constants.f90 sourcefile~mqc_xyz_reader.f90 mqc_xyz_reader.f90 sourcefile~mqc_physical_fragment.f90->sourcefile~mqc_xyz_reader.f90 sourcefile~mqc_mpi_comms.f90 mqc_mpi_comms.f90 sourcefile~mqc_resources.f90->sourcefile~mqc_mpi_comms.f90 sourcefile~mqc_thermochemistry.f90->sourcefile~mqc_elements.f90 sourcefile~mqc_thermochemistry.f90->sourcefile~mqc_physical_constants.f90 sourcefile~mqc_calculation_keywords.f90->sourcefile~mqc_calculation_defaults.f90 sourcefile~mqc_config_parser.f90->sourcefile~mqc_error.f90 sourcefile~mqc_config_parser.f90->sourcefile~mqc_calc_types.f90 sourcefile~mqc_config_parser.f90->sourcefile~mqc_calculation_defaults.f90 sourcefile~mqc_config_parser.f90->sourcefile~mqc_method_types.f90 sourcefile~mqc_config_parser.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_config_parser.f90->sourcefile~mqc_geometry.f90 sourcefile~mqc_frag_utils.f90->sourcefile~mqc_config_adapter.f90 sourcefile~mqc_frag_utils.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_frag_utils.f90->sourcefile~mqc_gmbe_utils.f90 sourcefile~mqc_combinatorics.f90 mqc_combinatorics.f90 sourcefile~mqc_frag_utils.f90->sourcefile~mqc_combinatorics.f90 sourcefile~mqc_fragment_lookup.f90 mqc_fragment_lookup.f90 sourcefile~mqc_frag_utils.f90->sourcefile~mqc_fragment_lookup.f90 sourcefile~mqc_gmbe_utils.f90->sourcefile~mqc_error.f90 sourcefile~mqc_gmbe_utils.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_gmbe_utils.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_gmbe_utils.f90->sourcefile~mqc_combinatorics.f90 sourcefile~mqc_method_dft.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_method_dft.f90->sourcefile~mqc_method_base.f90 sourcefile~mqc_method_dft.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_method_hf.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_method_hf.f90->sourcefile~mqc_method_base.f90 sourcefile~mqc_method_hf.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_method_mcscf.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_method_mcscf.f90->sourcefile~mqc_method_base.f90 sourcefile~mqc_method_mcscf.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_method_xtb.f90->sourcefile~mqc_error.f90 sourcefile~mqc_method_xtb.f90->sourcefile~mqc_result_types.f90 sourcefile~mqc_method_xtb.f90->sourcefile~mqc_method_base.f90 sourcefile~mqc_method_xtb.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_finite_differences.f90 mqc_finite_differences.f90 sourcefile~mqc_method_xtb.f90->sourcefile~mqc_finite_differences.f90 sourcefile~mqc_vibrational_analysis.f90->sourcefile~mqc_thermochemistry.f90 sourcefile~mqc_vibrational_analysis.f90->sourcefile~mqc_elements.f90 sourcefile~mqc_vibrational_analysis.f90->sourcefile~mqc_physical_constants.f90 sourcefile~mqc_xyz_reader.f90->sourcefile~mqc_error.f90 sourcefile~mqc_xyz_reader.f90->sourcefile~mqc_geometry.f90 sourcefile~mqc_combinatorics.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_finite_differences.f90->sourcefile~mqc_calculation_defaults.f90 sourcefile~mqc_finite_differences.f90->sourcefile~mqc_physical_fragment.f90 sourcefile~mqc_fragment_lookup.f90->sourcefile~mqc_error.f90

Source Code

submodule(mqc_mbe_fragment_distribution_scheme) mpi_fragment_work_smod
   use mqc_error, only: ERROR_VALIDATION, ERROR_GENERIC
   implicit none

contains

   module subroutine do_fragment_work(fragment_idx, result, method_config, phys_frag, calc_type, world_comm)
      !! Process a single fragment for quantum chemistry calculation
      !!
      !! Performs energy and gradient calculation on a molecular fragment using
      !! the factory pattern to create a calculator from the provided method_config.
      !! Verbosity is controlled by the global logger level.

      use pic_logger, only: verbose_level

      integer(int64), intent(in) :: fragment_idx        !! Fragment index for identification
      type(calculation_result_t), intent(out) :: result  !! Computation results
      type(method_config_t), intent(in) :: method_config  !! Method configuration
      type(physical_fragment_t), intent(in), optional :: phys_frag  !! Fragment geometry
      integer(int32), intent(in) :: calc_type  !! Calculation type
      type(comm_t), intent(in), optional :: world_comm  !! MPI communicator for abort

      integer :: current_log_level  !! Current logger verbosity level
      logical :: is_verbose  !! Whether verbose output is enabled
      integer(int32) :: calc_type_local  !! Local copy of calc_type
      type(method_config_t) :: local_config  !! Local copy for verbose override
      class(qc_method_t), allocatable :: calculator  !! Polymorphic calculator instance

      calc_type_local = calc_type

      ! Query logger to determine verbosity
      call logger%configuration(level=current_log_level)
      is_verbose = (current_log_level >= verbose_level)

      ! Print fragment geometry if provided and verbose mode is enabled
      if (present(phys_frag)) then
         if (is_verbose) then
            call print_fragment_xyz(fragment_idx, phys_frag)
         end if

         ! Copy config and override verbose based on logger level
         local_config = method_config
         local_config%verbose = is_verbose

         ! Create calculator using factory
         calculator = create_method(local_config)

         ! Run the calculation using polymorphic dispatch
         select case (calc_type_local)
         case (CALC_TYPE_ENERGY)
            call calculator%calc_energy(phys_frag, result)
         case (CALC_TYPE_GRADIENT)
            call calculator%calc_gradient(phys_frag, result)
         case (CALC_TYPE_HESSIAN)
            call calculator%calc_hessian(phys_frag, result)
         case default
            call result%error%set(ERROR_VALIDATION, "Unknown calc_type: "//calc_type_to_string(calc_type_local))
            result%has_error = .true.
            return
         end select

         ! Check for calculation errors
         if (result%has_error) then
            call result%error%add_context("do_fragment_work:fragment_"//to_char(fragment_idx))
            return
         end if

         ! Copy fragment distance to result for JSON output
         result%distance = phys_frag%distance

         ! Cleanup
         deallocate (calculator)
      else
         ! For empty fragments, set energy to zero
         call result%energy%reset()
         result%has_energy = .true.
      end if
   end subroutine do_fragment_work

   module subroutine global_coordinator(resources, total_fragments, polymers, max_level, &
                                        node_leader_ranks, num_nodes, sys_geom, method_config, calc_type, json_data)
      !! Global coordinator for distributing fragments to node coordinators
      !! will act as a node coordinator for a single node calculation
      !! Uses int64 for total_fragments to handle large fragment counts that overflow int32.
      !! Bond connectivity is accessed via sys_geom%bonds
      use mqc_json_output_types, only: json_output_data_t
      type(resources_t), intent(in) :: resources
      integer(int64), intent(in) :: total_fragments
      integer, intent(in) :: max_level, num_nodes
      integer, intent(in) :: polymers(:, :), node_leader_ranks(:)
      type(system_geometry_t), intent(in), optional :: sys_geom
      type(method_config_t), intent(in) :: method_config  !! Method configuration
      integer(int32), intent(in) :: calc_type
      type(json_output_data_t), intent(out), optional :: json_data  !! JSON output data

      type(timer_type) :: coord_timer
      integer(int64) :: current_fragment, results_received
      integer :: finished_nodes
      integer :: request_source, dummy_msg
      integer(int64) :: fragment_idx
      type(MPI_Status) :: status, local_status
      logical :: handling_local_workers
      logical :: has_pending
      integer(int32) :: calc_type_local

      ! For local workers
      integer :: local_finished_workers, local_dummy

      ! Storage for results
      type(calculation_result_t), allocatable :: results(:)
      integer(int64) :: worker_fragment_map(resources%mpi_comms%node_comm%size())
      integer :: worker_source

      ! MPI request handles for non-blocking operations
      type(request_t) :: req

      calc_type_local = calc_type

      current_fragment = total_fragments
      finished_nodes = 0
      local_finished_workers = 0
      handling_local_workers = (resources%mpi_comms%node_comm%size() > 1)
      results_received = 0_int64

      ! Allocate storage for results
      allocate (results(total_fragments))
      worker_fragment_map = 0

      call logger%verbose("Global coordinator starting with "//to_char(total_fragments)// &
                          " fragments for "//to_char(num_nodes)//" nodes")

      call coord_timer%start()
      do while (finished_nodes < num_nodes)

         ! PRIORITY 1: Check for incoming results from local workers
         ! This MUST be checked before sending new work to avoid race conditions
         if (handling_local_workers) then
            ! Keep checking for results until there are none pending
            do
           call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_SCALAR_RESULT, has_pending, local_status)
               if (.not. has_pending) exit

               worker_source = local_status%MPI_SOURCE

               ! Safety check: worker should have a fragment assigned
               if (worker_fragment_map(worker_source) == 0) then
                  call logger%error("Received result from worker "//to_char(worker_source)// &
                                    " but no fragment was assigned!")
                  call abort_comm(resources%mpi_comms%world_comm, 1)
               end if

               ! Receive result and store it using the fragment index for this worker
            call result_irecv(results(worker_fragment_map(worker_source)), resources%mpi_comms%node_comm, worker_source, &
                                 TAG_WORKER_SCALAR_RESULT, req)
               call wait(req)

               ! Check for calculation errors from worker
               if (results(worker_fragment_map(worker_source))%has_error) then
                  call logger%error("Fragment "//to_char(worker_fragment_map(worker_source))// &
                                    " calculation failed: "// &
                                    results(worker_fragment_map(worker_source))%error%get_message())
                  call abort_comm(resources%mpi_comms%world_comm, 1)
               end if

               ! Clear the mapping since we've received the result
               worker_fragment_map(worker_source) = 0
               results_received = results_received + 1
               if (mod(results_received, max(1_int64, total_fragments/10)) == 0 .or. &
                   results_received == total_fragments) then
                  call logger%info("  Processed "//to_char(results_received)//"/"// &
                                   to_char(total_fragments)//" fragments ["// &
                                   to_char(coord_timer%get_elapsed_time())//" s]")
               end if
            end do
         end if

         ! PRIORITY 1b: Check for incoming results from remote node coordinators
         do
            call iprobe(resources%mpi_comms%world_comm, MPI_ANY_SOURCE, TAG_NODE_SCALAR_RESULT, has_pending, status)
            if (.not. has_pending) exit

            ! Receive fragment index and result from node coordinator
            ! TODO: serialize the data for better performance
            call irecv(resources%mpi_comms%world_comm, fragment_idx, status%MPI_SOURCE, TAG_NODE_SCALAR_RESULT, req)
            call wait(req)
  call result_irecv(results(fragment_idx), resources%mpi_comms%world_comm, status%MPI_SOURCE, TAG_NODE_SCALAR_RESULT, req)
            call wait(req)

            ! Check for calculation errors from node coordinator
            if (results(fragment_idx)%has_error) then
               call logger%error("Fragment "//to_char(fragment_idx)//" calculation failed: "// &
                                 results(fragment_idx)%error%get_message())
               call abort_comm(resources%mpi_comms%world_comm, 1)
            end if

            results_received = results_received + 1
            if (mod(results_received, max(1_int64, total_fragments/10)) == 0 .or. &
                results_received == total_fragments) then
               call logger%info("  Processed "//to_char(results_received)//"/"// &
                                to_char(total_fragments)//" fragments ["// &
                                to_char(coord_timer%get_elapsed_time())//" s]")
            end if
         end do

         ! PRIORITY 2: Remote node coordinator requests
         call iprobe(resources%mpi_comms%world_comm, MPI_ANY_SOURCE, TAG_NODE_REQUEST, has_pending, status)
         if (has_pending) then
            call irecv(resources%mpi_comms%world_comm, dummy_msg, status%MPI_SOURCE, TAG_NODE_REQUEST, req)
            call wait(req)
            request_source = status%MPI_SOURCE

            if (current_fragment >= 1) then
               call send_fragment_to_node(resources%mpi_comms%world_comm, current_fragment, polymers, request_source)
               current_fragment = current_fragment - 1
            else
               call isend(resources%mpi_comms%world_comm, -1, request_source, TAG_NODE_FINISH, req)
               call wait(req)
               finished_nodes = finished_nodes + 1
            end if
         end if

         ! PRIORITY 3: Local workers (shared memory) - send new work
         if (handling_local_workers .and. local_finished_workers < resources%mpi_comms%node_comm%size() - 1) then
            call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_REQUEST, has_pending, local_status)
            if (has_pending) then
               ! Only process work request if this worker doesn't have pending results
               if (worker_fragment_map(local_status%MPI_SOURCE) == 0) then
                  call irecv(resources%mpi_comms%node_comm, local_dummy, local_status%MPI_SOURCE, TAG_WORKER_REQUEST, req)
                  call wait(req)

                  if (current_fragment >= 1) then
                     call send_fragment_to_worker(resources%mpi_comms%node_comm, current_fragment, polymers, &
                                                  local_status%MPI_SOURCE)
                     ! Track which fragment was sent to this worker
                     worker_fragment_map(local_status%MPI_SOURCE) = current_fragment
                     current_fragment = current_fragment - 1
                  else
                     call isend(resources%mpi_comms%node_comm, -1, local_status%MPI_SOURCE, TAG_WORKER_FINISH, req)
                     call wait(req)
                     local_finished_workers = local_finished_workers + 1
                  end if
               end if
               ! If worker still has pending results, skip the work request
               ! It will be processed on the next iteration after results are received
            end if
         end if

         ! Finalize local worker completion
         if (handling_local_workers .and. local_finished_workers >= resources%mpi_comms%node_comm%size() - 1 &
             .and. results_received >= total_fragments) then
            handling_local_workers = .false.
            if (num_nodes == 1) then
               finished_nodes = finished_nodes + 1
               call logger%debug("Manually incremented finished_nodes for self")
            else
               finished_nodes = finished_nodes + 1
               call logger%verbose("Global coordinator finished local workers")
            end if
         end if
      end do

      call logger%verbose("Global coordinator finished all fragments")
      call coord_timer%stop()
      call logger%info("Time to evaluate all fragments "//to_char(coord_timer%get_elapsed_time())//" s")
      block
         use mqc_result_types, only: mbe_result_t
         type(mbe_result_t) :: mbe_result

         ! Compute the many-body expansion
         call logger%info(" ")
         call logger%info("Computing Many-Body Expansion (MBE)...")
         call coord_timer%start()

         ! Allocate mbe_result components based on calc_type
         call mbe_result%allocate_dipole()  ! Always compute dipole
         if (calc_type_local == CALC_TYPE_HESSIAN) then
            if (.not. present(sys_geom)) then
               call logger%error("sys_geom required for Hessian calculation in global_coordinator")
               call abort_comm(resources%mpi_comms%world_comm, 1)
            end if
            call mbe_result%allocate_gradient(sys_geom%total_atoms)
            call mbe_result%allocate_hessian(sys_geom%total_atoms)
         else if (calc_type_local == CALC_TYPE_GRADIENT) then
            if (.not. present(sys_geom)) then
               call logger%error("sys_geom required for gradient calculation in global_coordinator")
               call abort_comm(resources%mpi_comms%world_comm, 1)
            end if
            call mbe_result%allocate_gradient(sys_geom%total_atoms)
         end if

         call compute_mbe(polymers, total_fragments, max_level, results, mbe_result, &
                          sys_geom, resources%mpi_comms%world_comm, json_data)
         call mbe_result%destroy()

         call coord_timer%stop()
         call logger%info("Time to compute MBE "//to_char(coord_timer%get_elapsed_time())//" s")

      end block

      ! Cleanup
      deallocate (results)
   end subroutine global_coordinator

   subroutine send_fragment_to_node(world_comm, fragment_idx, polymers, dest_rank)
      !! Send fragment data to remote node coordinator
      !! Uses int64 for fragment_idx to handle large fragment indices that overflow int32.
      type(comm_t), intent(in) :: world_comm
      integer(int64), intent(in) :: fragment_idx
      integer, intent(in) :: dest_rank
      integer, intent(in) :: polymers(:, :)
      integer :: fragment_size
      integer(int32) :: fragment_type
      integer, allocatable :: fragment_indices(:)
      type(request_t) :: req(4)
      integer(int64) :: fragment_idx_int64

      fragment_size = count(polymers(fragment_idx, :) > 0)
      allocate (fragment_indices(fragment_size))
      fragment_indices = polymers(fragment_idx, 1:fragment_size)

      ! Standard MBE always uses monomer indices
      fragment_type = FRAGMENT_TYPE_MONOMERS

      ! TODO: serialize the data for better performance
      fragment_idx_int64 = int(fragment_idx, kind=int64)
      call isend(world_comm, fragment_idx_int64, dest_rank, TAG_NODE_FRAGMENT, req(1))
      call isend(world_comm, fragment_type, dest_rank, TAG_NODE_FRAGMENT, req(2))
      call isend(world_comm, fragment_size, dest_rank, TAG_NODE_FRAGMENT, req(3))
      call isend(world_comm, fragment_indices, dest_rank, TAG_NODE_FRAGMENT, req(4))

      ! Wait for all sends to complete
      call wait(req(1))
      call wait(req(2))
      call wait(req(3))
      call wait(req(4))

      deallocate (fragment_indices)
   end subroutine send_fragment_to_node

   subroutine send_fragment_to_worker(node_comm, fragment_idx, polymers, dest_rank)
      !! Send fragment data to local worker
      !! Uses int64 for fragment_idx to handle large fragment indices that overflow int32.
      type(comm_t), intent(in) :: node_comm
      integer(int64), intent(in) :: fragment_idx
      integer, intent(in) :: dest_rank
      integer, intent(in) :: polymers(:, :)
      integer :: fragment_size
      integer(int32) :: fragment_type
      integer, allocatable :: fragment_indices(:)
      type(request_t) :: req(4)
      integer(int64) :: fragment_idx_int64

      fragment_size = count(polymers(fragment_idx, :) > 0)
      allocate (fragment_indices(fragment_size))
      fragment_indices = polymers(fragment_idx, 1:fragment_size)

      ! Standard MBE always uses monomer indices
      fragment_type = FRAGMENT_TYPE_MONOMERS

      ! TODO: serialize the data for better performance
      fragment_idx_int64 = int(fragment_idx, kind=int64)
      call isend(node_comm, fragment_idx_int64, dest_rank, TAG_WORKER_FRAGMENT, req(1))
      call isend(node_comm, fragment_type, dest_rank, TAG_WORKER_FRAGMENT, req(2))
      call isend(node_comm, fragment_size, dest_rank, TAG_WORKER_FRAGMENT, req(3))
      call isend(node_comm, fragment_indices, dest_rank, TAG_WORKER_FRAGMENT, req(4))

      ! Wait for all sends to complete
      call wait(req(1))
      call wait(req(2))
      call wait(req(3))
      call wait(req(4))

      deallocate (fragment_indices)
   end subroutine send_fragment_to_worker

   module subroutine node_coordinator(resources, method_config, calc_type)
      !! Node coordinator for distributing fragments to local workers
      !! Handles work requests and result collection from local workers
      type(resources_t), intent(in) :: resources
      type(method_config_t), intent(in) :: method_config  !! Method configuration (passed through to workers)
      integer(int32), intent(in) :: calc_type

      integer(int64) :: fragment_idx
      integer(int32) :: fragment_size, fragment_type, dummy_msg
      integer(int32) :: finished_workers
      integer(int32), allocatable :: fragment_indices(:)
      type(MPI_Status) :: status, global_status
      logical :: local_message_pending, more_fragments, has_result
      integer(int32) :: local_dummy

      ! For tracking worker-fragment mapping and collecting results
      integer(int64) :: worker_fragment_map(resources%mpi_comms%node_comm%size())
      integer(int32) :: worker_source
      type(calculation_result_t) :: worker_result

      ! MPI request handles for non-blocking operations
      type(request_t) :: req

      finished_workers = 0
      more_fragments = .true.
      dummy_msg = 0
      worker_fragment_map = 0

      do while (finished_workers < resources%mpi_comms%node_comm%size() - 1)

         ! PRIORITY 1: Check for incoming results from local workers
         call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_SCALAR_RESULT, has_result, status)
         if (has_result) then
            worker_source = status%MPI_SOURCE

            ! Safety check: worker should have a fragment assigned
            if (worker_fragment_map(worker_source) == 0) then
               call logger%error("Node coordinator received result from worker "//to_char(worker_source)// &
                                 " but no fragment was assigned!")
               call abort_comm(resources%mpi_comms%world_comm, 1)
            end if

            ! Receive result from worker
            call result_irecv(worker_result, resources%mpi_comms%node_comm, worker_source, TAG_WORKER_SCALAR_RESULT, req)
            call wait(req)

            ! Check for calculation errors before forwarding
            if (worker_result%has_error) then
               call logger%error("Fragment "//to_char(worker_fragment_map(worker_source))// &
                                 " calculation failed on worker "//to_char(worker_source)//": "// &
                                 worker_result%error%get_message())
               call abort_comm(resources%mpi_comms%world_comm, 1)
            end if

            ! Forward results to global coordinator with fragment index
call isend(resources%mpi_comms%world_comm, worker_fragment_map(worker_source), 0, TAG_NODE_SCALAR_RESULT, req)  ! fragment_idx
            call wait(req)
  call result_isend(worker_result, resources%mpi_comms%world_comm, 0, TAG_NODE_SCALAR_RESULT, req)                ! result
            call wait(req)

            ! Clear the mapping
            worker_fragment_map(worker_source) = 0
         end if

         ! PRIORITY 2: Check for work requests from local workers
         call iprobe(resources%mpi_comms%node_comm, MPI_ANY_SOURCE, TAG_WORKER_REQUEST, local_message_pending, status)

         if (local_message_pending) then
            ! Only process work request if this worker doesn't have pending results
            if (worker_fragment_map(status%MPI_SOURCE) == 0) then
               call irecv(resources%mpi_comms%node_comm, local_dummy, status%MPI_SOURCE, TAG_WORKER_REQUEST, req)
               call wait(req)

               if (more_fragments) then
                  call isend(resources%mpi_comms%world_comm, dummy_msg, 0, TAG_NODE_REQUEST, req)
                  call wait(req)
                  call irecv(resources%mpi_comms%world_comm, fragment_idx, 0, MPI_ANY_TAG, req)
                  call wait(req, global_status)

                  if (global_status%MPI_TAG == TAG_NODE_FRAGMENT) then
                     ! Receive fragment type (0 = monomer indices, 1 = intersection atom list)
                     call irecv(resources%mpi_comms%world_comm, fragment_type, 0, TAG_NODE_FRAGMENT, req)
                     call wait(req)
                     call irecv(resources%mpi_comms%world_comm, fragment_size, 0, TAG_NODE_FRAGMENT, req)
                     call wait(req)
                     ! Note: must use blocking recv for allocatable arrays since size is unknown
                     allocate (fragment_indices(fragment_size))
                     call recv(resources%mpi_comms%world_comm, fragment_indices, 0, TAG_NODE_FRAGMENT, global_status)

                     ! Forward to worker
                     call isend(resources%mpi_comms%node_comm, fragment_idx, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
                     call isend(resources%mpi_comms%node_comm, fragment_type, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
                     call isend(resources%mpi_comms%node_comm, fragment_size, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)
                  call isend(resources%mpi_comms%node_comm, fragment_indices, status%MPI_SOURCE, TAG_WORKER_FRAGMENT, req)
                     call wait(req)

                     ! Track which fragment was sent to this worker
                     worker_fragment_map(status%MPI_SOURCE) = fragment_idx

                     deallocate (fragment_indices)
                  else
                     call isend(resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req)
                     call wait(req)
                     finished_workers = finished_workers + 1
                     more_fragments = .false.
                  end if
               else
                  call isend(resources%mpi_comms%node_comm, -1, status%MPI_SOURCE, TAG_WORKER_FINISH, req)
                  call wait(req)
                  finished_workers = finished_workers + 1
               end if
            end if
         end if
      end do
   end subroutine node_coordinator

   module subroutine node_worker(resources, sys_geom, method_config, calc_type)
      !! Node worker for processing fragments assigned by node coordinator
      !! Bond connectivity is accessed via sys_geom%bonds
      use mqc_error, only: error_t
      type(resources_t), intent(in) :: resources
      type(system_geometry_t), intent(in), optional :: sys_geom
      type(method_config_t), intent(in) :: method_config  !! Method configuration
      integer(int32), intent(in) :: calc_type

      integer(int64) :: fragment_idx
      integer(int32) :: fragment_size, dummy_msg
      integer(int32) :: fragment_type  !! 0 = monomer (indices), 1 = intersection (atom list)
      integer(int32), allocatable :: fragment_indices(:)
      type(calculation_result_t) :: result
      type(MPI_Status) :: status
      type(physical_fragment_t) :: phys_frag
      type(error_t) :: error

      ! MPI request handles for non-blocking operations
      type(request_t) :: req

      dummy_msg = 0

      do
         call isend(resources%mpi_comms%node_comm, dummy_msg, 0, TAG_WORKER_REQUEST, req)
         call wait(req)
         call irecv(resources%mpi_comms%node_comm, fragment_idx, 0, MPI_ANY_TAG, req)
         call wait(req, status)

         select case (status%MPI_TAG)
         case (TAG_WORKER_FRAGMENT)
            ! Receive fragment type (0 = monomer indices, 1 = intersection atom list)
            call irecv(resources%mpi_comms%node_comm, fragment_type, 0, TAG_WORKER_FRAGMENT, req)
            call wait(req)
            call irecv(resources%mpi_comms%node_comm, fragment_size, 0, TAG_WORKER_FRAGMENT, req)
            call wait(req)
            ! Note: must use blocking recv for allocatable arrays since size is unknown
            allocate (fragment_indices(fragment_size))
            call recv(resources%mpi_comms%node_comm, fragment_indices, 0, TAG_WORKER_FRAGMENT, status)

            ! Build physical fragment based on type
            if (present(sys_geom)) then
               if (fragment_type == 0) then
                  ! Monomer: fragment_indices are monomer indices
                  call build_fragment_from_indices(sys_geom, fragment_indices, phys_frag, error, sys_geom%bonds)
               else
                  ! Intersection: fragment_indices are atom indices
           call build_fragment_from_atom_list(sys_geom, fragment_indices, fragment_size, phys_frag, error, sys_geom%bonds)
               end if

               if (error%has_error()) then
                  call logger%error(error%get_full_trace())
                  call abort_comm(resources%mpi_comms%world_comm, 1)
               end if

               ! Process the chemistry fragment with physical geometry
          call do_fragment_work(fragment_idx, result, method_config, phys_frag, calc_type, resources%mpi_comms%world_comm)

               call phys_frag%destroy()
            else
               ! Process without physical geometry (old behavior)
               call do_fragment_work(fragment_idx, result, method_config, &
                                     calc_type=calc_type, world_comm=resources%mpi_comms%world_comm)
            end if

            ! Send result back to coordinator
            call result_isend(result, resources%mpi_comms%node_comm, 0, TAG_WORKER_SCALAR_RESULT, req)
            call wait(req)

            ! Clean up result
            call result%destroy()
            deallocate (fragment_indices)
         case (TAG_WORKER_FINISH)
            exit
         case default
            ! Unexpected MPI tag - this should not happen in normal operation
            call logger%error("Worker received unexpected MPI tag: "//to_char(status%MPI_TAG))
            call logger%error("Expected TAG_WORKER_FRAGMENT or TAG_WORKER_FINISH")
            call abort_comm(resources%mpi_comms%world_comm, 1)
         end select
      end do
   end subroutine node_worker

end submodule mpi_fragment_work_smod