module spmd_utils 113,5

!----------------------------------------------------------------------- 
! 
! Purpose: This module is responsible for miscellaneous SPMD utilities
!          and information that are shared between dynamics and 
!          physics packages.  
! 
! Author:
!   Original routines:  CMS
!   Module:             T. Henderson, December 2003
!   swap routines:      P. Worley
!   fc routines:        P. Worley
!   SMP node id logic:  P. Worley
!
! $Id$
! 
!-----------------------------------------------------------------------

!-----------------------------------------------------------------------
!- use statements ------------------------------------------------------
!-----------------------------------------------------------------------
   use shr_kind_mod, only: r8 => shr_kind_r8
   use abortutils,   only: endrun
#if ( defined SPMD )
   use mpishorthand, only: mpiint, mpii8, mpichar, mpilog, mpipk,      &
                           mpic16, mpir8, mpir4, mpicom, mpimax,       &
                           mpi_max_processor_name,                     &
                           mpi_integer, mpi_integer8, mpi_character,   &
                           mpi_logical, mpi_real8, mpi_real4,          &
                           mpi_complex16,                              &
                           mpi_packed, mpi_comm_world, mpi_max,        &
                           mpi_comm_null, mpi_group_null,              &
                           mpi_undefined, mpi_status_size, mpi_success
#endif
   use dycore,       only: dycore_is
   use cam_logfile,  only: iulog

!-----------------------------------------------------------------------
!- module boilerplate --------------------------------------------------
!-----------------------------------------------------------------------
   implicit none
   private                   ! Make the default access private
   save


!-----------------------------------------------------------------------
! Public interfaces ----------------------------------------------------
!-----------------------------------------------------------------------
   public pair      ! $$$here...  originally from eul|sld/spmd_dyn
   public ceil2     ! $$$here...  originally from eul|sld/spmd_dyn
   public spmdinit
#if ( defined SPMD )
   public swap_comm_defaultopts 
   public swap_comm_setopts 
   public swapm
   public fc_gather_defaultopts 
   public fc_gather_setopts 
   public fc_gatherv
   public fc_gathervr4
   public fc_gathervint
   public fc_gathervc
   public altalltoallv
#endif

!-----------------------------------------------------------------------
! Public data ----------------------------------------------------------
!-----------------------------------------------------------------------
! physics-motivated dynamics decomposition request
   logical, parameter :: def_mirror = .false.                 ! default
   logical, public    :: phys_mirror_decomp_req = def_mirror 
                         ! flag indicating whether latitudes and their
                         ! reflections across the equator should be 
                         ! assigned to consecutive processes

#if (defined SPMD)
   public :: mpicom
   public :: mpichar
#else
   integer, public              :: mpicom
   integer, public              :: mpichar
#endif
   logical, public              :: masterproc
   integer, public              :: masterprocid
   integer, public              :: iam
   integer, public              :: npes
   integer, public              :: nsmps
   integer, allocatable, public :: proc_smp_map(:)
   integer, parameter           :: DEFAULT_MASTERPROC=0 
                                      ! the value of iam which is assigned 
                                      ! the masterproc duties

!-----------------------------------------------------------------------
! Private data ---------------------------------------------------------
!-----------------------------------------------------------------------
#if ( defined SPMD )
! Swap communication protocol options (reduced set):
!  3, 5:                  nonblocking send
!  2, 3, 4, 5:            nonblocking receive
!  4, 5:                  ready send
   integer, private, parameter :: min_comm_protocol =  2
   integer, private, parameter :: max_comm_protocol =  5
   integer, private, parameter :: def_comm_protocol =  4        ! default
   integer, private :: swap_comm_protocol = def_comm_protocol

! Swap communication maximum request count:
! = -1,0: do not limit number of outstanding send/receive requests
!    > 0: do not allow more than swap_comm_maxreq outstanding
!         nonblocking send requests or nonblocking receive requests
   integer, private, parameter :: def_comm_maxreq = 128        ! default
   integer, private :: swap_comm_maxreq = def_comm_maxreq

! Flow-controlled gather option:
!   < 0: use MPI_Gather
!  >= 0: use point-to-point with handshaking messages and 
!        preposting receive requests up to 
!         max(min(1,fc_gather_flow_cntl),max_gather_block_size) 
!        ahead
   integer, private, parameter :: max_gather_block_size = 64 ! max and default
   integer, private :: fc_gather_flow_cntl = max_gather_block_size

#endif

!-----------------------------------------------------------------------
! Subroutines and functions --------------------------------------------
!-----------------------------------------------------------------------
contains

!========================================================================


   integer function pair(np,p,k) 6

      integer np,p,k,q
      q = ieor(p,k)
      if(q.gt.np-1) then
         pair = -1
      else
         pair = q
      endif
      return

   end function pair

!========================================================================


  integer function ceil2(n) 14
     integer n,p
     p=1
     do while(p.lt.n)
        p=p*2
     enddo
     ceil2=p
     return
  end function ceil2

!========================================================================
  

  subroutine spmdinit( mpicom_atm ) 1,3
    !----------------------------------------------------------------------- 
    ! 
    ! Purpose: MPI initialization routine:  
    ! 
    ! Method: get number of cpus, processes, tids, etc
    !         dynamics and physics decompositions are set up later
    ! 
    ! Author: CCM Core Group
    ! 
    !-----------------------------------------------------------------------

    implicit none
    integer, intent(in) :: mpicom_atm

#if ( defined SPMD )
    !
    ! Local workspace
    !
    integer i,j,c             ! indices
    integer npthreads         ! thread status
    integer ier               ! return error status    
    integer length            ! length of name
    integer max_len           ! maximum name length
    integer, allocatable :: lengths(:)! max lengths of names for use in gatherv
    integer, allocatable :: displs(:) ! offsets for use in gatherv
    logical done
    character, allocatable                             :: proc_name(:)  ! processor name, this task
    character, allocatable                             :: proc_names(:) ! processor names, all tasks
    character(len=mpi_max_processor_name)              :: tmp_name      ! temporary storage
    character(len=mpi_max_processor_name), allocatable :: smp_names(:)  ! SMP name
    logical mpi_running       ! returned value indicates if MPI_INIT has been called

    !---------------------------------------------------------------------------
    !
    ! Determine CAM MPI communicator group
    !
    mpicom  = mpicom_atm
    !
    ! Set mpishorthand variables.  Need to set as variables rather than parameters since
    ! some MPI implementations set values for MPI tags at run time
    !
    mpiint  = mpi_integer
    mpii8   = mpi_integer8
    mpichar = mpi_character
    mpilog  = mpi_logical
    mpir4   = mpi_real4
    mpir8   = mpi_real8
    mpic16  = mpi_complex16
    mpipk   = mpi_packed
    mpimax  = mpi_max
    !
    ! Get my id  
    !
    call mpi_comm_rank (mpicom, iam, ier) 
    masterprocid = DEFAULT_MASTERPROC
    if (iam == DEFAULT_MASTERPROC) then 
       masterproc = .true.
    else
       masterproc = .false.
    end if
    !
    ! Get number of processors
    !
    max_len = mpi_max_processor_name
    call mpi_comm_size (mpicom, npes, ier)
    allocate ( displs(npes) )
    allocate ( lengths(npes) )
    allocate ( proc_name(max_len) )
    allocate ( proc_names(max_len*npes) )
 
    !
    ! Get processor names and send to root. 
    !
    call mpi_get_processor_name (tmp_name, length, ier)
    proc_name(:) = ' '
    do i = 1, length
       proc_name(i) = tmp_name(i:i)
    end do

    proc_names(:) = ' '
    lengths(:) = max_len
    do i=1,npes
       displs(i) = (i-1)*max_len
    enddo
    call fc_gathervc (proc_name,  max_len, mpichar, &
                      proc_names, lengths, displs, mpichar, &
                      0, mpicom, flow_cntl=-1)
    if (masterproc) then
       write(iulog,*) npes, 'pes participating in computation'
       write(iulog,*) '-----------------------------------'
       write(iulog,*) 'TASK#  NAME'
       do i=0,min(npes-1,256)  ! dont print too many of these
          do c=1,max_len
             tmp_name(c:c) = proc_names(i*max_len+c)
          enddo
          write(iulog,'(i3,2x,a)') i,trim(tmp_name)
       end do
       if(npes-1>256) then
          write(iulog,*) '... list truncated at 256'
       end if
    end if
    !
    ! Identify SMP nodes and process/SMP mapping.
    ! (Assume that processor names are SMP node names on SMP clusters.)
    !
    allocate ( proc_smp_map(0:npes-1) )
    if (masterproc) then
       allocate ( smp_names(0:npes-1) )
       smp_names(:) = ' '
       proc_smp_map(:) = -1
       !
       nsmps = 1
       do c=1,max_len
          tmp_name(c:c) = proc_names(c)
       enddo
       smp_names(0) = trim(tmp_name)
       proc_smp_map(0) = 0
       !
       do i=1,npes-1
          do c=1,max_len
             tmp_name(c:c) = proc_names(i*max_len+c)
          enddo

          j = 0
          done = .false.
          do while ((.not. done) .and. (j < nsmps))
             if (smp_names(j) .eq. trim(tmp_name)) then
                proc_smp_map(i) = j
                done = .true.
             endif
             j = j + 1
          enddo

          if (.not. done) then
             smp_names(nsmps) = trim(tmp_name)
             proc_smp_map(i) = nsmps
             nsmps = nsmps + 1
          endif

       enddo
       deallocate(smp_names)
    endif
    call mpibcast(nsmps, 1, mpiint, 0, mpicom)
    call mpibcast(proc_smp_map, npes, mpiint, 0, mpicom)
    !
    deallocate(displs)
    deallocate(lengths)
    deallocate(proc_name)
    deallocate(proc_names)

#else
    !	
    ! spmd is not defined
    !
    mpicom = mpicom_atm
    iam = 0
    masterprocid = 0
    masterproc = .true.
    npes = 1
    nsmps = 1
    allocate ( proc_smp_map(0:0) )
    proc_smp_map(:) = -1

#endif   

  end subroutine spmdinit

!----------------------------------------------------------------------- 
! 
! Purpose: swap communication routines used in performance portable 
!          distributed transpose algorithms.
! 
! Entry points:
!      swap_comm_defaultopts   Get default runtime options.
!      swap_comm_setopts       Set runtime options.
!      
!      swapm                   Implementation of swap for multiple
!                              messages using MPI point-to-point routines.
!
! Author: P. Worley
!-----------------------------------------------------------------------

#if (defined SPMD)

!
!========================================================================
!

   subroutine swap_comm_defaultopts(swap_comm_protocol_out, & 1
                                    swap_comm_maxreq_out  )
!----------------------------------------------------------------------- 
! Purpose: Return default runtime options
! Author: P. Worley (modelled after Tom Henderson's code)
!-----------------------------------------------------------------------
!------------------------------Arguments--------------------------------
   ! swap module communication protocol option
   integer, intent(out), optional :: swap_comm_protocol_out
   ! swap module communication nonblocking request maximum
   integer, intent(out), optional :: swap_comm_maxreq_out
!-----------------------------------------------------------------------
   if ( present(swap_comm_protocol_out) ) then
      swap_comm_protocol_out = def_comm_protocol
   endif
   if ( present(swap_comm_maxreq_out) ) then
      swap_comm_maxreq_out = def_comm_maxreq
   endif
!
   return
   end subroutine swap_comm_defaultopts
!
!========================================================================
!

   subroutine swap_comm_setopts(swap_comm_protocol_in, & 1
                                swap_comm_maxreq_in )
!----------------------------------------------------------------------- 
! Purpose: Set runtime options
! Author: P. Worley (modelled after Tom Henderson's code)
!-----------------------------------------------------------------------
!------------------------------Arguments--------------------------------
     ! swap module communication protocol option
     integer, intent(in), optional :: swap_comm_protocol_in
     ! swap module communication nonblocking request maximum
     integer, intent(in), optional :: swap_comm_maxreq_in
!-----------------------------------------------------------------------
     if ( present(swap_comm_protocol_in) ) then
        swap_comm_protocol = swap_comm_protocol_in
        if ((swap_comm_protocol < min_comm_protocol) .or. &
            (swap_comm_protocol > max_comm_protocol)) then
           if (masterproc) then
              write(iulog,*)                                        &
                 'SWAP_COMM_SETOPTS:  ERROR:  swap_comm_protocol=', &
                 swap_comm_protocol, ' is out of range.'
              write(iulog,*)                                        &
                 '  It must be between ', min_comm_protocol,' and ',&
                 max_comm_protocol
              write(iulog,*)                                        &
                 '  Using default value.'
           endif
           swap_comm_protocol = def_comm_protocol
        endif
     endif
!
     if ( present(swap_comm_maxreq_in) ) then
        swap_comm_maxreq = swap_comm_maxreq_in
     endif
!
     if (masterproc) then
        write(iulog,*) 'SWAP_COMM_SETOPTS: '
        write(iulog,*) '  swap_comm_protocol = ', swap_comm_protocol
        write(iulog,*) '  swap_comm_maxreq   = ', swap_comm_maxreq
      endif
!
     return
   end subroutine swap_comm_setopts
!
!========================================================================
!

   subroutine swapm (steps, nprocs, swapids,               & 1
                     sndbuf, sbuf_siz, sndlths, sdispls,   &
                     rcvbuf, rbuf_siz, rcvlths, rdispls,   &
                     comm, comm_protocol, comm_maxreq      )

!----------------------------------------------------------------------- 
! 
! Purpose: 
!   Reduced version of original swapm (for swap of multiple messages 
!   using MPI point-to-point routines), more efficiently implementing a 
!   subset of the swap protocols.
! 
! Method: 
! comm_protocol:
!  = 3 or 5: use nonblocking send
!  = 2 or 4: use blocking send
!  = 4 or 5: use handshaking protocol
! comm_maxreq:
!  =-1,0: do not limit number of outstanding send/receive requests
!     >0: do not allow more than min(comm_maxreq, steps) outstanding
!         nonblocking send requests or nonblocking receive requests
!
! Author of original version:  P. Worley
! Ported to CAM: P. Worley, December 2003
! Simplified version: P. Worley, October, 2008
! 
!-----------------------------------------------------------------------

!-----------------------------------------------------------------------
   implicit none
!---------------------------Input arguments--------------------------
!
   integer, intent(in)   :: steps              ! number of swaps to initiate
   integer, intent(in)   :: nprocs             ! size of communicator
   integer, intent(in)   :: sbuf_siz           ! size of send buffer
   integer, intent(in)   :: rbuf_siz           ! size of receive buffer
   integer, intent(in)   :: swapids(steps)     ! MPI process id of swap partners

   integer, intent(in)   :: sndlths(0:nprocs-1)! length of outgoing message
   integer, intent(in)   :: sdispls(0:nprocs-1)! offset from beginning of send
                                               !  buffer where outgoing messages
                                               !  should be sent from
   integer, intent(in)   :: rcvlths(0:nprocs-1)! length of incoming messages
   integer, intent(in)   :: rdispls(0:nprocs-1)! offset from beginning of receive 
                                               !  buffer where incoming messages
                                               !  should be placed
   real(r8), intent(in)  :: sndbuf(sbuf_siz)   ! outgoing message buffer
   real(r8), intent(out) :: rcvbuf(rbuf_siz)   ! incoming message buffer

   integer, intent(in)   :: comm               ! MPI communicator
   integer, intent(in)   :: comm_protocol      ! swap_comm protocol
   integer, intent(in)   :: comm_maxreq        ! maximum number of outstanding 
                                               !  nonblocking requests

!
!---------------------------Local workspace-----------------------------
!
   integer :: p                                ! process index
   integer :: istep                            ! loop index
   integer :: offset_s                         ! index of message beginning in 
                                               !  send buffer
   integer :: offset_r                         ! index of message beginning in 
                                               !  receive buffer
   integer :: sndids(steps)                    ! send request ids
   integer :: rcvids(steps)                    ! receive request ids
   integer :: hs_rcvids(steps)                 ! handshake receive request ids

   integer :: maxreq, maxreqh                  ! maximum number of outstanding 
                                               !  nonblocking requests (and half)
   integer :: hs                               ! handshake variable
   integer :: rstep                            ! "receive" step index

   logical :: handshake, sendd                 ! protocol option flags

   integer :: ier                              ! return error status    
   integer :: status(MPI_STATUS_SIZE)          ! MPI status 
!
!-------------------------------------------------------------------------------------
!
   if (steps .eq. 0) return

   ! identify communication protocol
   if ((comm_protocol < 2) .or. (comm_protocol > 5)) then
      sendd = .true.
      handshake = .true.
   else
      if ((comm_protocol .eq. 4) .or. (comm_protocol .eq. 5)) then
         handshake = .true.
      else
         handshake = .false.
      endif

      if ((comm_protocol .eq. 2) .or. (comm_protocol .eq. 4)) then
         sendd = .true.
      else
         sendd = .false.
      endif
   endif

   ! identify maximum number of outstanding nonblocking requests to permit
   if (steps .eq. 1) then
      maxreq  = 1
      maxreqh = 1
   else
      if (comm_maxreq >= -1) then
         maxreq = comm_maxreq
      else
         maxreq = steps
      endif

      if ((maxreq .le. steps) .and. (maxreq > 0)) then
         if (maxreq > 1) then
            maxreqh = maxreq/2
         else
            maxreq  = 2
            maxreqh = 1
         endif
      else
         maxreq  = steps
         maxreqh = steps
      endif
   endif

! Four protocol options:
!  (1) handshaking + blocking sends
   if ((handshake) .and. (sendd)) then

      ! Post initial handshake receive requests
      do istep=1,maxreq
         p = swapids(istep)
         if (sndlths(p) > 0) then
            call mpi_irecv( hs, 1, mpiint, p, iam, comm, &
                            hs_rcvids(istep), ier )
         endif
      enddo

      ! Post initial receive requests
      do istep=1,maxreq
         p = swapids(istep)
         if (rcvlths(p) > 0) then
            offset_r = rdispls(p)+1
            call mpi_irecv( rcvbuf(offset_r), rcvlths(p), mpir8, p, p, &
                            comm, rcvids(istep), ier )
            call mpi_send ( hs, 1, mpiint, p, p, comm, ier )
         endif
      enddo
      rstep = maxreq

      ! Send (and start receiving) data 
      do istep=1,steps
         p = swapids(istep)

         ! Submit new rsend request
         if (sndlths(p) > 0) then
            offset_s = sdispls(p)+1
            call mpi_wait  ( hs_rcvids(istep), status, ier )
            call mpi_rsend ( sndbuf(offset_s), sndlths(p), mpir8, p, iam, &
                             comm, ier )
         endif

         if (istep > maxreqh) then

            ! Wait for oldest irecv request to complete
            p = swapids(istep-maxreqh)
            if (rcvlths(p) > 0) then
               call mpi_wait( rcvids(istep-maxreqh), status, ier )
            endif

            if (rstep < steps) then
               rstep = rstep + 1
               p = swapids(rstep)

               ! Submit a new handshake irecv request
               if (sndlths(p) > 0) then
                  call mpi_irecv( hs, 1, mpiint, p, iam, comm, &
                                  hs_rcvids(rstep), ier )
               endif

               ! Submit a new irecv request
               if (rcvlths(p) > 0) then
                  offset_r = rdispls(p)+1
                  call mpi_irecv( rcvbuf(offset_r), rcvlths(p), mpir8, p, p, &
                                  comm, rcvids(rstep), ier )
                  call mpi_send ( hs, 1, mpiint, p, p, comm, ier )
               endif
            endif

         endif
!
      enddo

      ! wait for rest of receive requests to complete
      do istep=steps-maxreqh+1,steps
         p = swapids(istep)
         if (rcvlths(p) > 0) then
            call mpi_wait( rcvids(istep), status, ier )
         endif
      enddo

!  (2) handshaking + nonblocking sends
   elseif ((handshake) .and. (.not. sendd)) then

      ! Post initial handshake receive requests
      do istep=1,maxreq
         p = swapids(istep)
         if (sndlths(p) > 0) then
            call mpi_irecv( hs, 1, mpiint, p, iam, comm, &
                            hs_rcvids(istep), ier )
         endif
      enddo

      ! Post initial receive requests
      do istep=1,maxreq
         p = swapids(istep)
         if (rcvlths(p) > 0) then
            offset_r = rdispls(p)+1
            call mpi_irecv( rcvbuf(offset_r), rcvlths(p), mpir8, p, p, &
                            comm, rcvids(istep), ier )
            call mpi_send ( hs, 1, mpiint, p, p, comm, ier )
         endif
      enddo
      rstep = maxreq

      ! Send (and start receiving) data 
      do istep=1,steps
         p = swapids(istep)

         ! Submit new irsend request
         if (sndlths(p) > 0) then
            offset_s = sdispls(p)+1
            call mpi_wait  ( hs_rcvids(istep), status, ier )
            call mpi_irsend( sndbuf(offset_s), sndlths(p), mpir8, p, iam, &
                             comm, sndids(istep), ier )
         endif

         if (istep > maxreqh) then

            ! Wait for oldest irecv request to complete
            p = swapids(istep-maxreqh)
            if (rcvlths(p) > 0) then
               call mpi_wait( rcvids(istep-maxreqh), status, ier )
            endif

            if (rstep < steps) then
               rstep = rstep + 1
               p = swapids(rstep)

               ! Submit a new handshake irecv request
               if (sndlths(p) > 0) then
                  call mpi_irecv( hs, 1, mpiint, p, iam, comm, &
                                  hs_rcvids(rstep), ier )
               endif

               ! Submit a new irecv request
               if (rcvlths(p) > 0) then
                  offset_r = rdispls(p)+1
                  call mpi_irecv( rcvbuf(offset_r), rcvlths(p), mpir8, p, p, &
                                  comm, rcvids(rstep), ier )
                  call mpi_send ( hs, 1, mpiint, p, p, comm, ier )
               endif
            endif

            ! Wait for outstanding i(r)send request to complete
            p = swapids(istep-maxreqh)
            if (sndlths(p) > 0) then
               call mpi_wait( sndids(istep-maxreqh), status, ier )
            endif

         endif

      enddo

      ! wait for rest of send and receive requests to complete
      do istep=steps-maxreqh+1,steps
         p = swapids(istep)
         if (rcvlths(p) > 0) then
            call mpi_wait( rcvids(istep), status, ier )
         endif
         if (sndlths(p) > 0) then
            call mpi_wait( sndids(istep), status, ier )
         endif
      enddo

!  (3) no handshaking + blocking sends
   elseif ((.not. handshake) .and. (sendd)) then

      ! Post receive requests
      do istep=1,maxreq
         p = swapids(istep)
         if (rcvlths(p) > 0) then
            offset_r = rdispls(p)+1
            call mpi_irecv( rcvbuf(offset_r), rcvlths(p), mpir8, p, p, &
                            comm, rcvids(istep), ier )
         endif
      enddo
      rstep = maxreq

      ! Send (and start receiving) data 
      do istep=1,steps
         p = swapids(istep)

         ! Submit new send request
         if (sndlths(p) > 0) then
            offset_s = sdispls(p)+1
            call mpi_send( sndbuf(offset_s), sndlths(p), mpir8, p, iam, &
                           comm, ier )
         endif

         if (istep > maxreqh) then

            ! Wait for oldest irecv request to complete
            p = swapids(istep-maxreqh)
            if (rcvlths(p) > 0) then
               call mpi_wait( rcvids(istep-maxreqh), status, ier )
            endif

            ! Submit a new irecv request
            if (rstep < steps) then
               rstep = rstep + 1
               p = swapids(rstep)
               if (rcvlths(p) > 0) then
                  offset_r = rdispls(p)+1
                  call mpi_irecv( rcvbuf(offset_r), rcvlths(p), mpir8, p, p, &
                                  comm, rcvids(rstep), ier )
               endif
            endif

         endif

      enddo

      ! wait for rest of send and receive requests to complete
      do istep=steps-maxreqh+1,steps
         p = swapids(istep)
         if (rcvlths(p) > 0) then
            call mpi_wait( rcvids(istep), status, ier )
         endif
      enddo

!  (4) no handshaking + nonblocking sends
   elseif ((.not. handshake) .and. (.not. sendd)) then

      ! Post receive requests
      do istep=1,maxreq
         p = swapids(istep)
         if (rcvlths(p) > 0) then
            offset_r = rdispls(p)+1
            call mpi_irecv( rcvbuf(offset_r), rcvlths(p), mpir8, p, p, &
                            comm, rcvids(istep), ier )
         endif
      enddo
      rstep = maxreq

      ! Send (and start receiving) data 
      do istep=1,steps
         p = swapids(istep)

         ! Submit new isend request
         if (sndlths(p) > 0) then
            offset_s = sdispls(p)+1
            call mpi_isend( sndbuf(offset_s), sndlths(p), mpir8, p, iam, &
                            comm, sndids(istep), ier )
         endif

         if (istep > maxreqh) then

            ! Wait for oldest irecv request to complete
            p = swapids(istep-maxreqh)
            if (rcvlths(p) > 0) then
               call mpi_wait( rcvids(istep-maxreqh), status, ier )
            endif

            ! Submit a new irecv request
            if (rstep < steps) then
               rstep = rstep + 1
               p = swapids(rstep)
               if (rcvlths(p) > 0) then
                  offset_r = rdispls(p)+1
                  call mpi_irecv( rcvbuf(offset_r), rcvlths(p), mpir8, p, p, &
                                  comm, rcvids(rstep), ier )
               endif
            endif

            ! Wait for outstanding i(r)send request to complete
            p = swapids(istep-maxreqh)
            if (sndlths(p) > 0) then
               call mpi_wait( sndids(istep-maxreqh), status, ier )
            endif

         endif

      enddo

      ! wait for rest of send and receive requests to complete
      do istep=steps-maxreqh+1,steps
         p = swapids(istep)
         if (rcvlths(p) > 0) then
            call mpi_wait( rcvids(istep), status, ier )
         endif
         if (sndlths(p) > 0) then
            call mpi_wait( sndids(istep), status, ier )
         endif
      enddo

   endif

   return

   end subroutine swapm
!
!========================================================================
#endif

!----------------------------------------------------------------------- 
! 
! Purpose: gather collective with additional flow control, so as to 
!          be more robust when used with high process counts. 
!          If flow_cntl optional parameter 
!           < 0: use MPI_Gather
!          >= 0: use point-to-point with handshaking messages and 
!                preposting receive requests up to 
!                 max(min(1,flow_cntl),max_gather_block_size) 
!                ahead if optional flow_cntl parameter is present.
!                Otherwise, fc_gather_flow_cntl is used in its place.
!          Default value is 64.
! 
! Entry points:
!      fc_gather_defaultopts   Get default runtime options.
!      fc_gather_setopts       Set runtime options.
!      fc_gatherv       functionally equivalent to mpi_gatherv
!      fc_gathervr4     functionally equivalent to mpi_gatherv for real*4 data
!      fc_gathervint    functionally equivalent to mpi_gatherv for integer data
!      fc_gathervc      functionally equivalent to mpi_gatherv for character data
!
! Author: P. Worley
!-----------------------------------------------------------------------

#if (defined SPMD)

!
!========================================================================
!

   subroutine fc_gather_defaultopts( fc_gather_flow_cntl_out ) 1
!----------------------------------------------------------------------- 
! Purpose: Return default runtime options
! Author: P. Worley (modelled after Tom Henderson's code)
!-----------------------------------------------------------------------
!------------------------------Arguments--------------------------------
   ! fc_gather flow control option
   !   < 0: use MPI_Gather
   !  >= 0: use point-to-point with handshaking messages and 
   !       preposting receive requests up to 
   !       max(min(1,fc_gather_flow_cntl),max_gather_block_size) ahead
   !  Default value is defined by private parameter 
   !   max_gather_block_size
   integer, intent(out), optional :: fc_gather_flow_cntl_out
!-----------------------------------------------------------------------
   if ( present(fc_gather_flow_cntl_out) ) then
      fc_gather_flow_cntl_out = max_gather_block_size
   endif
!
   return
   end subroutine fc_gather_defaultopts
!
!========================================================================
!

   subroutine fc_gather_setopts( fc_gather_flow_cntl_in ) 1
!----------------------------------------------------------------------- 
! Purpose: Set runtime options
! Author: P. Worley (modelled after Tom Henderson's code)
!-----------------------------------------------------------------------
!------------------------------Arguments--------------------------------
   ! fc_gather flow control option
   integer, intent(in), optional :: fc_gather_flow_cntl_in
!-----------------------------------------------------------------------
   if ( present(fc_gather_flow_cntl_in) ) then
      fc_gather_flow_cntl = fc_gather_flow_cntl_in
   endif
!
   if (masterproc) then
      write(iulog,*) 'FC_GATHER_SETOPTS: '
      write(iulog,*) '  fc_gather_flow_cntl = ', fc_gather_flow_cntl
   endif
!
   return
   end subroutine fc_gather_setopts
!
!========================================================================
!

   subroutine fc_gatherv (sendbuf, sendcnt, sendtype, & 2,6
                         recvbuf, recvcnts, displs, recvtype, &
                         root, comm, flow_cntl )
!
! Collects different messages from each process on masterproc
!
   use shr_kind_mod, only: r8 => shr_kind_r8
   use mpishorthand
   use abortutils, only: endrun
   use cam_logfile,  only: iulog
#if defined( WRAP_MPI_TIMING )
   use perf_mod
#endif

   implicit none

   real (r8), intent(in)  :: sendbuf(*)
   real (r8), intent(out) :: recvbuf(*)
   integer, intent(in) :: displs(*)
   integer, intent(in) :: sendcnt
   integer, intent(in) :: sendtype
   integer, intent(in) :: recvcnts(*)
   integer, intent(in) :: recvtype
   integer, intent(in) :: root
   integer, intent(in) :: comm
   integer, optional, intent(in) :: flow_cntl

   real (r8) :: signal
   logical fc_gather         ! use explicit flow control?
   integer gather_block_size ! number of preposted receive requests

   integer :: mytid, mysize, mtag, p, q, i, count
   integer :: preposts, head, tail
   integer :: rcvid(max_gather_block_size)
   integer :: status(MPI_STATUS_SIZE)
   integer ier               ! MPI error code

   if ( present(flow_cntl) ) then
      if (flow_cntl >= 0) then
         gather_block_size = min(max(1,flow_cntl),max_gather_block_size)
         fc_gather = .true.
      else
         fc_gather = .false.
      endif
   else
      if (fc_gather_flow_cntl >= 0) then
         gather_block_size = min(max(1,fc_gather_flow_cntl),max_gather_block_size)
         fc_gather = .true.
      else
         fc_gather = .false.
      endif
   endif

   if (fc_gather) then
 
#if defined( WRAP_MPI_TIMING )
      call t_startf ('fc_gatherv_r8')
#endif
      call mpi_comm_rank (comm, mytid, ier)
      call mpi_comm_size (comm, mysize, ier)
      mtag = 0
      if (root .eq. mytid) then

! prepost gather_block_size irecvs, and start receiving data
         preposts = min(mysize-1, gather_block_size)
         head = 0
         count = 0
         do p=0, mysize-1
            if (p .ne. root) then
               q = p+1
               if (recvcnts(q) > 0) then
                  count = count + 1
                  if (count > preposts) then
                     tail = mod(head,preposts) + 1
                     call mpi_wait (rcvid(tail), status, ier)
                  end if
                  head = mod(head,preposts) + 1
                  call mpi_irecv ( recvbuf(displs(q)+1), recvcnts(q), &
                                   recvtype, p, mtag, comm, rcvid(head), &
                                   ier )
                  call mpi_send ( signal, 1, mpir8, p, mtag, comm, ier )
               end if
            end if
         end do

! copy local data
         q = mytid+1
         do i=1,sendcnt
            recvbuf(displs(q)+i) = sendbuf(i)
         enddo

! wait for final data
         do i=1,min(count,preposts)
            call mpi_wait (rcvid(i), status, ier)
         enddo

      else

         if (sendcnt > 0) then
            call mpi_recv ( signal, 1, mpir8, root, mtag, comm, &
                            status, ier )
            call mpi_rsend ( sendbuf, sendcnt, sendtype, root, mtag, &
                             comm, ier )
         end if

      endif
      if (ier /= mpi_success) then
         write(iulog,*)'fc_gatherv_r8 failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('fc_gatherv_r8')
#endif

   else
 
#if defined( WRAP_MPI_TIMING )
      call t_startf ('mpi_gatherv')
#endif
      call mpi_gatherv (sendbuf, sendcnt, sendtype, &
                        recvbuf, recvcnts, displs, recvtype, &
                        root, comm, ier)
      if (ier /= mpi_success) then
         write(iulog,*)'mpi_gatherv failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('mpi_gatherv')
#endif

   endif

   return
   end subroutine fc_gatherv
!
!========================================================================
!

   subroutine fc_gathervr4 (sendbuf, sendcnt, sendtype, & 2,6
                           recvbuf, recvcnts, displs, recvtype, &
                           root, comm, flow_cntl )
!
! Collects different messages from each process on masterproc
!
   use shr_kind_mod, only: r4 => shr_kind_r4, r8 => shr_kind_r8
   use mpishorthand
   use abortutils, only: endrun
   use cam_logfile,  only: iulog
#if defined( WRAP_MPI_TIMING )
   use perf_mod
#endif

   implicit none

   real (r4), intent(in)  :: sendbuf(*)
   real (r4), intent(out) :: recvbuf(*)
   integer, intent(in) :: displs(*)
   integer, intent(in) :: sendcnt
   integer, intent(in) :: sendtype
   integer, intent(in) :: recvcnts(*)
   integer, intent(in) :: recvtype
   integer, intent(in) :: root
   integer, intent(in) :: comm
   integer, optional, intent(in) :: flow_cntl

   real (r8) :: signal
   logical fc_gather         ! use explicit flow control?
   integer gather_block_size ! number of preposted receive requests

   integer :: mytid, mysize, mtag, p, q, i, count
   integer :: preposts, head, tail
   integer :: rcvid(max_gather_block_size)
   integer :: status(MPI_STATUS_SIZE)
   integer ier               ! MPI error code

   if ( present(flow_cntl) ) then
      if (flow_cntl >= 0) then
         gather_block_size = min(max(1,flow_cntl),max_gather_block_size)
         fc_gather = .true.
      else
         fc_gather = .false.
      endif
   else
      if (fc_gather_flow_cntl >= 0) then
         gather_block_size = min(max(1,fc_gather_flow_cntl),max_gather_block_size)
         fc_gather = .true.
      else
         fc_gather = .false.
      endif
   endif

   if (fc_gather) then
 
#if defined( WRAP_MPI_TIMING )
      call t_startf ('fc_gatherv_r4')
#endif
      call mpi_comm_rank (comm, mytid, ier)
      call mpi_comm_size (comm, mysize, ier)
      mtag = 0
      if (root .eq. mytid) then

! prepost gather_block_size irecvs, and start receiving data
         preposts = min(mysize-1, gather_block_size)
         head = 0
         count = 0
         do p=0, mysize-1
            if (p .ne. root) then
               q = p+1
               if (recvcnts(q) > 0) then
                  count = count + 1
                  if (count > preposts) then
                     tail = mod(head,preposts) + 1
                     call mpi_wait (rcvid(tail), status, ier)
                  end if
                  head = mod(head,preposts) + 1
                  call mpi_irecv ( recvbuf(displs(q)+1), recvcnts(q), &
                                   recvtype, p, mtag, comm, rcvid(head), &
                                   ier )
                  call mpi_send ( signal, 1, mpir8, p, mtag, comm, ier )
               end if
            end if
         end do

! copy local data
         q = mytid+1
         do i=1,sendcnt
            recvbuf(displs(q)+i) = sendbuf(i)
         enddo

! wait for final data
         do i=1,min(count,preposts)
            call mpi_wait (rcvid(i), status, ier)
         enddo

      else

         if (sendcnt > 0) then
            call mpi_recv ( signal, 1, mpir8, root, mtag, comm, &
                            status, ier )
            call mpi_rsend ( sendbuf, sendcnt, sendtype, root, mtag, &
                             comm, ier )
          end if

      endif
      if (ier /= mpi_success) then
         write(iulog,*)'fc_gatherv_r4 failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('fc_gatherv_r4')
#endif

   else
 
#if defined( WRAP_MPI_TIMING )
      call t_startf ('mpi_gatherv')
#endif
      call mpi_gatherv (sendbuf, sendcnt, sendtype, &
                        recvbuf, recvcnts, displs, recvtype, &
                        root, comm, ier)
      if (ier /= mpi_success) then
         write(iulog,*)'mpi_gatherv failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('mpi_gatherv')
#endif

   endif

   return
   end subroutine fc_gathervr4
!
!========================================================================
!

   subroutine fc_gathervint (sendbuf, sendcnt, sendtype, & 2,6
                            recvbuf, recvcnts, displs, recvtype, &
                            root, comm, flow_cntl )
!
! Collects different messages from each process on masterproc
!
   use shr_kind_mod, only: r8 => shr_kind_r8
   use mpishorthand
   use abortutils, only: endrun
   use cam_logfile,  only: iulog
#if defined( WRAP_MPI_TIMING )
   use perf_mod
#endif

   implicit none

   integer, intent(in)  :: sendbuf(*)
   integer, intent(out) :: recvbuf(*)
   integer, intent(in) :: displs(*)
   integer, intent(in) :: sendcnt
   integer, intent(in) :: sendtype
   integer, intent(in) :: recvcnts(*)
   integer, intent(in) :: recvtype
   integer, intent(in) :: root
   integer, intent(in) :: comm
   integer, optional, intent(in) :: flow_cntl

   real (r8) :: signal
   logical fc_gather         ! use explicit flow control?
   integer gather_block_size ! number of preposted receive requests

   integer :: mytid, mysize, mtag, p, q, i, count
   integer :: preposts, head, tail
   integer :: rcvid(max_gather_block_size)
   integer :: status(MPI_STATUS_SIZE)
   integer ier               ! MPI error code

   if ( present(flow_cntl) ) then
      if (flow_cntl >= 0) then
         gather_block_size = min(max(1,flow_cntl),max_gather_block_size)
         fc_gather = .true.
      else
         fc_gather = .false.
      endif
   else
      if (fc_gather_flow_cntl >= 0) then
         gather_block_size = min(max(1,fc_gather_flow_cntl),max_gather_block_size)
         fc_gather = .true.
      else
         fc_gather = .false.
      endif
   endif

   if (fc_gather) then
 
#if defined( WRAP_MPI_TIMING )
      call t_startf ('fc_gatherv_int')
#endif
      call mpi_comm_rank (comm, mytid, ier)
      call mpi_comm_size (comm, mysize, ier)
      mtag = 0
      if (root .eq. mytid) then

! prepost gather_block_size irecvs, and start receiving data
         preposts = min(mysize-1, gather_block_size)
         head = 0
         count = 0
         do p=0, mysize-1
            if (p .ne. root) then
               q = p+1
               if (recvcnts(q) > 0) then
                  count = count + 1
                  if (count > preposts) then
                     tail = mod(head,preposts) + 1
                     call mpi_wait (rcvid(tail), status, ier)
                  end if
                  head = mod(head,preposts) + 1
                  call mpi_irecv ( recvbuf(displs(q)+1), recvcnts(q), &
                                   recvtype, p, mtag, comm, rcvid(head), &
                                   ier )
                  call mpi_send ( signal, 1, mpir8, p, mtag, comm, ier )
               end if
            end if
         end do

! copy local data
         q = mytid+1
         do i=1,sendcnt
            recvbuf(displs(q)+i) = sendbuf(i)
         enddo

! wait for final data
         do i=1,min(count,preposts)
            call mpi_wait (rcvid(i), status, ier)
         enddo

      else

         if (sendcnt > 0) then
            call mpi_recv ( signal, 1, mpir8, root, mtag, comm, &
                            status, ier )
            call mpi_rsend ( sendbuf, sendcnt, sendtype, root, mtag, &
                             comm, ier )
          end if

      endif
      if (ier /= mpi_success) then
         write(iulog,*)'fc_gatherv_int failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('fc_gatherv_int')
#endif

   else
 
#if defined( WRAP_MPI_TIMING )
      call t_startf ('mpi_gatherv')
#endif
      call mpi_gatherv (sendbuf, sendcnt, sendtype, &
                        recvbuf, recvcnts, displs, recvtype, &
                        root, comm, ier)
      if (ier /= mpi_success) then
         write(iulog,*)'mpi_gatherv failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('mpi_gatherv')
#endif

   endif

   return
   end subroutine fc_gathervint
!
!========================================================================
!

   subroutine fc_gathervc (sendbuf, sendcnt, sendtype, & 1,6
                           recvbuf, recvcnts, displs, recvtype, &
                           root, comm, flow_cntl )
!
! Collects different messages from each process on masterproc
!
   use shr_kind_mod, only: r8 => shr_kind_r8
   use mpishorthand
   use abortutils, only: endrun
   use cam_logfile,  only: iulog
#if defined( WRAP_MPI_TIMING )
   use perf_mod
#endif

   implicit none

   character, intent(in)  :: sendbuf(*)
   character, intent(out) :: recvbuf(*)
   integer, intent(in) :: displs(*)
   integer, intent(in) :: sendcnt
   integer, intent(in) :: sendtype
   integer, intent(in) :: recvcnts(*)
   integer, intent(in) :: recvtype
   integer, intent(in) :: root
   integer, intent(in) :: comm
   integer, optional, intent(in) :: flow_cntl

   real (r8) :: signal
   logical fc_gather         ! use explicit flow control?
   integer gather_block_size ! number of preposted receive requests

   integer :: mytid, mysize, mtag, p, q, i, count
   integer :: preposts, head, tail
   integer :: rcvid(max_gather_block_size)
   integer :: status(MPI_STATUS_SIZE)
   integer ier               ! MPI error code

   if ( present(flow_cntl) ) then
      if (flow_cntl >= 0) then
         gather_block_size = min(max(1,flow_cntl),max_gather_block_size)
         fc_gather = .true.
      else
         fc_gather = .false.
      endif
   else
      if (fc_gather_flow_cntl >= 0) then
         gather_block_size = min(max(1,fc_gather_flow_cntl),max_gather_block_size)
         fc_gather = .true.
      else
         fc_gather = .false.
      endif
   endif

   if (fc_gather) then
 
#if defined( WRAP_MPI_TIMING )
      call t_startf ('fc_gatherv_char')
#endif
      call mpi_comm_rank (comm, mytid, ier)
      call mpi_comm_size (comm, mysize, ier)
      mtag = 0
      if (root .eq. mytid) then

! prepost gather_block_size irecvs, and start receiving data
         preposts = min(mysize-1, gather_block_size)
         head = 0
         count = 0
         do p=0, mysize-1
            if (p .ne. root) then
               q = p+1
               if (recvcnts(q) > 0) then
                  count = count + 1
                  if (count > preposts) then
                     tail = mod(head,preposts) + 1
                     call mpi_wait (rcvid(tail), status, ier)
                  end if
                  head = mod(head,preposts) + 1
                  call mpi_irecv ( recvbuf(displs(q)+1), recvcnts(q), &
                                   recvtype, p, mtag, comm, rcvid(head), &
                                   ier )
                  call mpi_send ( signal, 1, mpir8, p, mtag, comm, ier )
               end if
            end if
         end do

! copy local data
         q = mytid+1
         do i=1,sendcnt
            recvbuf(displs(q)+i) = sendbuf(i)
         enddo

! wait for final data
         do i=1,min(count,preposts)
            call mpi_wait (rcvid(i), status, ier)
         enddo

      else

         if (sendcnt > 0) then
            call mpi_recv ( signal, 1, mpir8, root, mtag, comm, &
                            status, ier )
            call mpi_rsend ( sendbuf, sendcnt, sendtype, root, mtag, &
                             comm, ier )
          end if

      endif
      if (ier /= mpi_success) then
         write(iulog,*)'fc_gatherv_char failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('fc_gatherv_char')
#endif

   else
 
#if defined( WRAP_MPI_TIMING )
      call t_startf ('mpi_gatherv')
#endif
      call mpi_gatherv (sendbuf, sendcnt, sendtype, &
                        recvbuf, recvcnts, displs, recvtype, &
                        root, comm, ier)
      if (ier /= mpi_success) then
         write(iulog,*)'mpi_gatherv failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('mpi_gatherv')
#endif

   endif

   return
   end subroutine fc_gathervc
!
!========================================================================
#endif

!----------------------------------------------------------------------- 
! 
! Purpose: implementations of MPI_Alltoall using different messaging
!          layers and different communication protocols, controlled
!          by option argument:
!  0: use mpi_alltoallv
!  1: use point-to-point MPI-1 two-sided implementation
!  2: use point-to-point MPI-2 one-sided implementation if supported, 
!       otherwise use MPI-1 implementation
!  3: use Co-Array Fortran implementation if supported, 
!       otherwise use MPI-1 implementation
!  otherwise use mpi_sendrecv implementation
! 
! Entry points:
!      altalltoallv
!
! Author: P. Worley
!-----------------------------------------------------------------------

#if (defined SPMD)
!****************************************************************

   subroutine altalltoallv (option, mytid, nprocs, steps, dests, & 4,5
                 sendbuf, sbuf_siz, sendcnts, sdispls, sendtype, &
                 recvbuf, rbuf_siz, recvcnts, rdispls, recvtype, &
                 msgtag, pdispls, desttype, recvwin, comm)
!
! All-to-all scatter/gather implemented using Co-Array
! Fortran one-sided commands, MPI-2 one sided commands,
! SWAP module MPI-1 commands, MPI_ALLTOALLV or MPI_SENDRECV.
!
#if defined( WRAP_MPI_TIMING )
   use perf_mod
#endif

   implicit none

   integer, intent(in) :: option               ! 0: mpi_alltoallv
                                               ! 1: swap package
                                               ! 2: mpi2 
                                               ! 3: co-array fortran
                                       ! otherwise: sendrecv
   integer, intent(in) :: mytid
   integer, intent(in) :: nprocs
   integer, intent(in) :: steps
   integer, intent(in) :: dests(steps)
   integer, intent(in) :: sbuf_siz
   integer, intent(in) :: sendcnts(0:nprocs-1)
   integer, intent(in) :: sdispls(0:nprocs-1)
   integer, intent(in) :: sendtype
   integer, intent(in) :: rbuf_siz
   integer, intent(in) :: recvcnts(0:nprocs-1)
   integer, intent(in) :: rdispls(0:nprocs-1)
   integer, intent(in) :: recvtype
   integer, intent(in) :: msgtag
   integer, intent(in) :: pdispls(0:nprocs-1)   ! displacement at 
                                                !  destination
   integer, intent(in) :: desttype
   integer, intent(in) :: recvwin
   integer, intent(in) :: comm

#if (defined CAF)
   real (r8), intent(in)  :: sendbuf(sbuf_siz)[*]
   real (r8), intent(out) :: recvbuf(rbuf_siz)[*]

   integer :: istart, iend, jstart, jend
#else
   real (r8), intent(in)  :: sendbuf(sbuf_siz)
   real (r8), intent(out) :: recvbuf(rbuf_siz)
#endif

   integer :: loption          ! local copy of option
   integer :: dest             ! MPI remote process id
   integer :: ier              ! MPI error code
   integer :: i                ! loop index
   integer :: sndids(steps)    ! nonblocking MPI send request ids
   integer :: rcvids(steps)    ! nonblocking MPI recv request ids
   integer :: status(MPI_STATUS_SIZE)
#if ( defined MPI2)
   integer(kind=MPI_ADDRESS_KIND) :: ddispls
#endif

!-----------------------------------------------------------------------
   loption = option

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!  using MPI library collective MPI_ALLTOALLV
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
   if (loption .eq. 0) then

#if defined( WRAP_MPI_TIMING )
      call t_startf ('mpi_alltoallv')
#endif
      call mpi_alltoallv (sendbuf, sendcnts, sdispls, sendtype, &
                          recvbuf, recvcnts, rdispls, recvtype, &
                          comm, ier)
!
! test for error
      if (ier/=mpi_success) then
         write(iulog,*)'altalltoallv (mpi_alltoallv) failed ier=',ier
         call endrun
      end if
#if defined( WRAP_MPI_TIMING )
      call t_stopf ('mpi_alltoallv')
#endif

   else

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!  Co-Array Fortran implementation of alltoallv
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      if (loption .eq. 3) then

#if ( defined CAF )
#if defined( WRAP_MPI_TIMING )
         call t_startf ('caf_alltoallv')
#endif
         if (this_image() .ne. (mytid+1)) then
            call endrun('altalltoallv (caf_alltoallv) failed: MPI id .ne. CAF id')
         endif

         call sync_images()

!DIR$ CONCURRENT
         do i = 1, steps
            dest = dests(i)
            if (sendcnts(dest) > 0) then
               istart = sdispls(dest)+1
               iend   = istart+sendcnts(dest)-1
               jstart = pdispls(dest)+1
               jend   = jstart+sendcnts(dest)-1
               recvbuf(jstart:jend)[dest+1] = sendbuf(istart:iend)
            end if
         end do

         call sync_images()
#if defined( WRAP_MPI_TIMING )
         call t_stopf ('caf_alltoallv')
#endif
#else
         loption = -1
#endif

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!  MPI-2 one-sided implementation of alltoallv
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      elseif (loption .eq. 2) then
#ifdef MPI2
#if defined( WRAP_MPI_TIMING )
         call t_startf ('mpi2_alltoallv')
#endif
         call mpi_win_fence(0,recvwin,ier)
         do i=1, steps
            dest = dests(i)
            if (sendcnts(dest) > 0) then
               ddispls = pdispls(dest)
               call mpi_put(sendbuf(sdispls(dest)+1), sendcnts(dest), sendtype, &
                            dest, ddispls, sendcnts(dest), desttype, &
                            recvwin, ier)
            endif
         end do
!
! wait for completion
         call mpi_win_fence(0,recvwin,ier)
         if (ier/=mpi_success) then
            write(iulog,*)'altalltoallv (mpi2_alltoallv) failed ier=',ier
            call endrun
         end if
#if defined( WRAP_MPI_TIMING )
         call t_stopf ('mpi2_alltoallv')
#endif
#else
         loption = -1
#endif

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!  MPI-1 two-sided implementation of alltoallv
!  using SWAP routines
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      elseif (loption .eq. 1) then
#if defined( WRAP_MPI_TIMING )
         call t_startf ('swap_alltoallv')
#endif

         call swapm(steps, nprocs, dests,                      &
                    sendbuf, sbuf_siz, sendcnts, sdispls,      &
                    recvbuf, rbuf_siz, recvcnts, rdispls,      &
                    comm, swap_comm_protocol, swap_comm_maxreq )
!
#if defined( WRAP_MPI_TIMING )
         call t_stopf ('swap_alltoallv')
#endif

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!  Anything else defined to be MPI_SENDRECV implementation
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      else
!
         loption = -1
!
      endif

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!  MPI_SENDRECV implementation of alltoallv
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      if (loption .eq. -1) then
#if defined( WRAP_MPI_TIMING )
         call t_startf ('mpi1_alltoallv')
#endif
         do i=1, steps
            dest = dests(i)
            call mpi_sendrecv (sendbuf(sdispls(dest)+1), sendcnts(dest), &
                               sendtype, dest, msgtag,                   &
                               recvbuf(rdispls(dest)+1), recvcnts(dest), &
                               recvtype, dest, msgtag,                   &
                               comm, status, ier)
         end do
!
! test for error
         if (ier/=mpi_success) then
            write(iulog,*)'altalltoallv (mpi1_alltoallv) failed ier=',ier
            call endrun
         end if

#if defined( WRAP_MPI_TIMING )
         call t_stopf ('mpi1_alltoallv')
#endif
      endif

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!  Local copy (if necessary)
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      if (sendcnts(mytid) > 0) then
         do i=1,sendcnts(iam)
            recvbuf(rdispls(mytid)+i) = sendbuf(sdispls(mytid)+i)
         enddo
      endif
!
   endif
!
   return
   end subroutine altalltoallv

#endif

end module spmd_utils