diff -pruN 2.20.0-1/asyncssh/auth.py 2.21.0-1/asyncssh/auth.py
--- 2.20.0-1/asyncssh/auth.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/auth.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2013-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -548,10 +548,10 @@ class ServerAuth(Auth):
 
         self._conn.send_userauth_failure(partial_success)
 
-    def send_success(self) -> None:
+    async def send_success(self) -> None:
         """Send a user authentication success response"""
 
-        self._conn.send_userauth_success()
+        await self._conn.send_userauth_success()
 
 
 class _ServerNullAuth(ServerAuth):
@@ -596,7 +596,7 @@ class _ServerGSSKexAuth(ServerAuth):
                 (await self._conn.validate_gss_principal(self._username,
                                                          self._gss.user,
                                                          self._gss.host))):
-            self.send_success()
+            await self.send_success()
         else:
             self.send_failure()
 
@@ -650,7 +650,7 @@ class _ServerGSSMICAuth(ServerAuth):
         if (await self._conn.validate_gss_principal(self._username,
                                                     self._gss.user,
                                                     self._gss.host)):
-            self.send_success()
+            await self.send_success()
         else:
             self.send_failure()
 
@@ -757,7 +757,7 @@ class _ServerHostBasedAuth(ServerAuth):
                                                       key_data, client_host,
                                                       client_username,
                                                       msg, signature)):
-            self.send_success()
+            await self.send_success()
         else:
             self.send_failure()
 
@@ -795,7 +795,7 @@ class _ServerPublicKeyAuth(ServerAuth):
         if (await self._conn.validate_public_key(self._username, key_data,
                                                  msg, signature)):
             if sig_present:
-                self.send_success()
+                await self.send_success()
             else:
                 self.send_packet(MSG_USERAUTH_PK_OK, String(algorithm),
                                  String(key_data))
@@ -832,9 +832,9 @@ class _ServerKbdIntAuth(ServerAuth):
 
         challenge = await self._conn.get_kbdint_challenge(self._username,
                                                           lang, submethods)
-        self._send_challenge(challenge)
+        await self._send_challenge(challenge)
 
-    def _send_challenge(self, challenge: KbdIntChallenge) -> None:
+    async def _send_challenge(self, challenge: KbdIntChallenge) -> None:
         """Send a keyboard interactive authentication request"""
 
         if isinstance(challenge, (tuple, list)):
@@ -848,7 +848,7 @@ class _ServerKbdIntAuth(ServerAuth):
                              String(instruction), String(lang),
                              UInt32(num_prompts), *prompts_bytes)
         elif challenge:
-            self.send_success()
+            await self.send_success()
         else:
             self.send_failure()
 
@@ -857,7 +857,7 @@ class _ServerKbdIntAuth(ServerAuth):
 
         next_challenge = \
             await self._conn.validate_kbdint_response(self._username, responses)
-        self._send_challenge(next_challenge)
+        await self._send_challenge(next_challenge)
 
     def _process_info_response(self, _pkttype: int, _pktid: int,
                                packet: SSHPacket) -> None:
@@ -922,7 +922,7 @@ class _ServerPasswordAuth(ServerAuth):
                     await self._conn.validate_password(self._username, password)
 
             if result:
-                self.send_success()
+                await self.send_success()
             else:
                 self.send_failure()
         except PasswordChangeRequired as exc:
diff -pruN 2.20.0-1/asyncssh/channel.py 2.21.0-1/asyncssh/channel.py
--- 2.20.0-1/asyncssh/channel.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/channel.py	2025-05-03 13:32:15.000000000 +0000
@@ -26,6 +26,7 @@ import codecs
 import inspect
 import re
 import signal as _signal
+import sys
 from types import MappingProxyType
 from typing import TYPE_CHECKING, Any, AnyStr, Awaitable, Callable
 from typing import Dict, Generic, Iterable, List, Mapping, Optional
@@ -225,7 +226,13 @@ class SSHChannel(Generic[AnyStr], SSHPac
             self._request_waiters = []
 
         if self._session is not None:
-            self._session.connection_lost(exc)
+            # pylint: disable=broad-except
+            try:
+                self._session.connection_lost(exc)
+            except Exception:
+                self.logger.debug1('Uncaught exception in session ignored',
+                                   exc_info=sys.exc_info)
+
             self._session = None
 
         self._close_event.set()
@@ -2058,16 +2065,16 @@ class SSHTCPChannel(SSHForwardChannel, G
             SSHTCPSession[AnyStr]:
         """Create a new outbound TCP session"""
 
-        return (await self._open_tcp(session_factory, b'direct-tcpip',
-                                     host, port, orig_host, orig_port))
+        return await self._open_tcp(session_factory, b'direct-tcpip',
+                                    host, port, orig_host, orig_port)
 
     async def accept(self, session_factory: SSHTCPSessionFactory[AnyStr],
                      host: str, port: int, orig_host: str,
                      orig_port: int) -> SSHTCPSession[AnyStr]:
         """Create a new forwarded TCP session"""
 
-        return (await self._open_tcp(session_factory, b'forwarded-tcpip',
-                                     host, port, orig_host, orig_port))
+        return await self._open_tcp(session_factory, b'forwarded-tcpip',
+                                    host, port, orig_host, orig_port)
 
     def set_inbound_peer_names(self, dest_host: str, dest_port: int,
                                orig_host: str, orig_port: int) -> None:
diff -pruN 2.20.0-1/asyncssh/connection.py 2.21.0-1/asyncssh/connection.py
--- 2.20.0-1/asyncssh/connection.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/connection.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2013-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -1075,7 +1075,13 @@ class SSHConnection(SSHPacketHandler, as
             self._wait = None
 
         if self._owner: # pragma: no branch
-            self._owner.connection_lost(exc)
+            # pylint: disable=broad-except
+            try:
+                self._owner.connection_lost(exc)
+            except Exception:
+                self.logger.debug1('Uncaught exception in owner ignored',
+                                   exc_info=sys.exc_info)
+
             self._owner = None
 
         self._cancel_login_timer()
@@ -1196,7 +1202,7 @@ class SSHConnection(SSHPacketHandler, as
 
         return self._server
 
-    def is_closed(self):
+    def is_closed(self) -> bool:
         """Return whether the connection is closed"""
 
         return self._close_event.is_set()
@@ -2069,7 +2075,7 @@ class SSHConnection(SSHPacketHandler, as
         self.send_packet(MSG_USERAUTH_FAILURE, NameList(methods),
                          Boolean(partial_success))
 
-    def send_userauth_success(self) -> None:
+    async def send_userauth_success(self) -> None:
         """Send a user authentication success response"""
 
         self.logger.info('Auth for user %s succeeded', self._username)
@@ -2086,13 +2092,15 @@ class SSHConnection(SSHPacketHandler, as
         self._set_keepalive_timer()
 
         if self._owner: # pragma: no branch
-            self._owner.auth_completed()
+            result = self._owner.auth_completed()
+
+            if inspect.isawaitable(result):
+                await result
 
         if self._acceptor:
             result = self._acceptor(self)
 
             if inspect.isawaitable(result):
-                assert result is not None
                 self.create_task(result)
 
             self._acceptor = None
@@ -2506,7 +2514,7 @@ class SSHConnection(SSHPacketHandler, as
                 result = await cast(Awaitable[bool], result)
 
             if not result:
-                self.send_userauth_success()
+                await self.send_userauth_success()
                 return
 
         if not self._owner: # pragma: no cover
@@ -2603,7 +2611,6 @@ class SSHConnection(SSHPacketHandler, as
                 result = self._acceptor(self)
 
                 if inspect.isawaitable(result):
-                    assert result is not None
                     self.create_task(result)
 
                 self._acceptor = None
@@ -3229,9 +3236,8 @@ class SSHConnection(SSHPacketHandler, as
                     raise ChannelOpenError(OPEN_ADMINISTRATIVELY_PROHIBITED,
                                            'Connection forwarding denied')
 
-            return (await self.create_connection(session_factory,
-                                                 dest_host, dest_port,
-                                                 orig_host, orig_port))
+            return await self.create_connection(session_factory, dest_host,
+                                                dest_port, orig_host, orig_port)
 
         if (listen_host, listen_port) == (dest_host, dest_port):
             self.logger.info('Creating local TCP forwarder on %s',
@@ -4130,7 +4136,6 @@ class SSHClientConnection(SSHConnection)
                                                 retained, revoked)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
         self._report_global_response(True)
@@ -5052,8 +5057,8 @@ class SSHClientConnection(SSHConnection)
 
         """
 
-        return (await create_connection(client_factory, host, port,
-                                        tunnel=self, **kwargs)) # type: ignore
+        return await create_connection(client_factory, host, port,
+                                       tunnel=self, **kwargs) # type: ignore
 
     @async_context_manager
     async def connect_ssh(self, host: str, port: DefTuple[int] = (),
@@ -5321,8 +5326,7 @@ class SSHClientConnection(SSHConnection)
                     raise ChannelOpenError(OPEN_ADMINISTRATIVELY_PROHIBITED,
                                            'Connection forwarding denied')
 
-            return (await self.create_unix_connection(session_factory,
-                                                      dest_path))
+            return await self.create_unix_connection(session_factory, dest_path)
 
         self.logger.info('Creating local TCP forwarder from %s to %s',
                          (listen_host, listen_port), dest_path)
@@ -6301,6 +6305,9 @@ class SSHServerConnection(SSHConnection)
             if not result:
                 raise ChannelOpenError(OPEN_CONNECT_FAILED, 'Session refused')
 
+            if isinstance(result, SSHClientConnection):
+                result = self.forward_tunneled_session(result)
+
             if isinstance(result, tuple):
                 chan, result = result
             else:
@@ -6356,6 +6363,10 @@ class SSHServerConnection(SSHConnection)
         if result is True:
             result = cast(SSHTCPSession[bytes],
                           self.forward_connection(dest_host, dest_port))
+        elif isinstance(result, SSHClientConnection):
+            result = cast(Awaitable[SSHTCPSession[bytes]],
+                          self.forward_tunneled_connection(
+                              result, dest_host, dest_port))
 
         if isinstance(result, tuple):
             chan, result = result
@@ -6502,6 +6513,10 @@ class SSHServerConnection(SSHConnection)
         if result is True:
             result = cast(SSHUNIXSession[bytes],
                           self.forward_unix_connection(dest_path))
+        elif isinstance(result, SSHClientConnection):
+            result = cast(Awaitable[SSHUNIXSession[bytes]],
+                          self.forward_tunneled_unix_connection(
+                              result, dest_path))
 
         if isinstance(result, tuple):
             chan, result = result
@@ -6617,10 +6632,14 @@ class SSHServerConnection(SSHConnection)
             result = False
 
         if not result:
-            raise ChannelOpenError(OPEN_CONNECT_FAILED, 'Connection refused')
+            raise ChannelOpenError(OPEN_CONNECT_FAILED,
+                                   'TUN/TAP request refused')
 
         if result is True:
             result = cast(SSHTunTapSession, self.forward_tuntap(mode, unit))
+        elif isinstance(result, SSHClientConnection):
+            result = cast(Awaitable[SSHTunTapSession],
+                          self.forward_tunneled_tuntap(result, mode, unit))
 
         if isinstance(result, tuple):
             chan, result = result
@@ -7175,6 +7194,76 @@ class SSHServerConnection(SSHConnection)
 
         return SSHReader[bytes](session, chan), SSHWriter[bytes](session, chan)
 
+    async def forward_tunneled_session(
+            self, conn: SSHClientConnection) -> SSHServerProcess:
+        """Forward a tunneled session between SSH connections"""
+
+        async def process_factory(process: SSHServerProcess) -> None:
+            """Return an upstream process used to forward the session"""
+
+            encoding, errors = process.channel.get_encoding()
+
+            upstream_process: SSHClientProcess = await conn.create_process(
+                command=process.command, subsystem=process.subsystem,
+                env=process.env, term_type=process.term_type,
+                term_size=process.term_size, term_modes=process.term_modes,
+                encoding=encoding, errors=errors, stdin=process.stdin,
+                stdout=process.stdout, stderr=process.stderr)
+
+            await upstream_process.wait_closed()
+
+        self.logger.info('  Forwarding session via SSH tunnel')
+
+        return SSHServerProcess(process_factory, None, MIN_SFTP_VERSION, False)
+
+    async def forward_tunneled_connection(
+            self, conn: SSHClientConnection,
+            dest_host: str, dest_port: int) -> SSHForwarder:
+        """Forward a tunneled TCP connection between SSH connections"""
+
+        _, peer = await conn.create_connection(
+            cast(SSHTCPSessionFactory[bytes], SSHForwarder),
+            dest_host, dest_port)
+
+        self.logger.info('  Forwarding TCP connection to %s via SSH tunnel',
+                         (dest_host, dest_port))
+
+        return SSHForwarder(cast(SSHForwarder, peer))
+
+    async def forward_tunneled_unix_connection(
+            self, conn: SSHClientConnection,
+            dest_path: str) -> SSHForwarder:
+        """Forward a tunneled UNIX connection between SSH connections"""
+
+        _, peer = await conn.create_unix_connection(
+            cast(SSHUNIXSessionFactory[bytes], SSHForwarder), dest_path)
+
+        self.logger.info('  Forwarding UNIX connection to %s via SSH tunnel',
+                         dest_path)
+
+        return SSHForwarder(cast(SSHForwarder, peer))
+
+    async def forward_tunneled_tuntap(
+            self, conn: SSHClientConnection,
+            mode: int, unit: Optional[int]) -> SSHForwarder:
+        """Forward a TUN/TAP connection between SSH connections"""
+
+        if mode == SSH_TUN_MODE_POINTTOPOINT:
+            create_func = conn.create_tun
+            layer = 3
+        else:
+            create_func = conn.create_tap
+            layer = 2
+
+        transport, peer = await create_func(
+            cast(SSHTunTapSessionFactory, SSHForwarder), unit)
+        interface = transport.get_extra_info('interface')
+
+        self.logger.info('  Forwarding layer %d traffic to %s via SSH tunnel',
+                         layer, interface)
+
+        return SSHForwarder(cast(SSHForwarder, peer))
+
 
 class SSHConnectionOptions(Options, Generic[_Options]):
     """SSH connection options"""
@@ -8473,11 +8562,11 @@ class SSHServerConnectionOptions(SSHConn
            errors of data exchanged on sessions on this server, defaulting
            to 'strict'.
        :param sftp_factory: (optional)
-           A `callable` which returns an :class:`SFTPServer` object that
-           will be created each time an SFTP session is requested by the
-           client, or `True` to use the base :class:`SFTPServer` class
-           to handle SFTP requests. If not specified, SFTP sessions are
-           rejected by default.
+           A `callable` or coroutine which returns an :class:`SFTPServer`
+           object that will be created each time an SFTP session is
+           requested by the client, or `True` to use the base
+           :class:`SFTPServer` class to handle SFTP requests. If not
+           specified, SFTP sessions are rejected by default.
        :param sftp_version: (optional)
            The maximum version of the SFTP protocol to support, currently
            either 3 or 4, defaulting to 3.
@@ -8624,7 +8713,7 @@ class SSHServerConnectionOptions(SSHConn
        :type session_factory: `callable` or coroutine
        :type encoding: `str` or `None`
        :type errors: `str`
-       :type sftp_factory: `callable`
+       :type sftp_factory: `callable` or coroutine
        :type sftp_version: `int`
        :type allow_scp: `bool`
        :type window: `int`
diff -pruN 2.20.0-1/asyncssh/kex_dh.py 2.21.0-1/asyncssh/kex_dh.py
--- 2.20.0-1/asyncssh/kex_dh.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/kex_dh.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2013-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -343,6 +343,9 @@ class _KexDHGex(_KexDHBase):
         if self._conn.is_client():
             raise ProtocolError('Unexpected kex request msg')
 
+        if self._p:
+            raise ProtocolError('Kex DH group already requested')
+
         self._gex_data = packet.get_remaining_payload()
 
         if pkttype == MSG_KEX_DH_GEX_REQUEST_OLD:
@@ -377,6 +380,9 @@ class _KexDHGex(_KexDHBase):
         if self._conn.is_server():
             raise ProtocolError('Unexpected kex group msg')
 
+        if self._p:
+            raise ProtocolError('Kex DH group already sent')
+
         p = packet.get_mpint()
         g = packet.get_mpint()
         packet.check_end()
@@ -529,6 +535,7 @@ class _KexGSSBase(_KexDHBase):
 
         self._gss = conn.get_gss_context()
         self._token: Optional[bytes] = None
+        self._host_key_msg_ok = False
         self._host_key_data = b''
 
     def _check_secure(self) -> None:
@@ -621,6 +628,8 @@ class _KexGSSBase(_KexDHBase):
         if self._conn.is_client() and self._gss.complete:
             raise ProtocolError('Unexpected kexgss continue msg')
 
+        self._host_key_msg_ok = False
+
         await self._process_token(token)
 
         if self._conn.is_server() and self._gss.complete:
@@ -636,6 +645,8 @@ class _KexGSSBase(_KexDHBase):
         if self._conn.is_server():
             raise ProtocolError('Unexpected kexgss complete msg')
 
+        self._host_key_msg_ok = False
+
         self._parse_server_key(packet)
         mic = packet.get_string()
         token_present = packet.get_boolean()
@@ -662,6 +673,10 @@ class _KexGSSBase(_KexDHBase):
                          packet: SSHPacket) -> None:
         """Process a GSS hostkey message"""
 
+        if not self._host_key_msg_ok:
+            raise ProtocolError('Unexpected kexgss hostkey msg')
+
+        self._host_key_msg_ok = False
         self._host_key_data = packet.get_string()
         packet.check_end()
 
@@ -685,6 +700,7 @@ class _KexGSSBase(_KexDHBase):
         """Start GSS key exchange"""
 
         if self._conn.is_client():
+            self._host_key_msg_ok = True
             await self._process_token()
             await super().start()
 
diff -pruN 2.20.0-1/asyncssh/misc.py 2.21.0-1/asyncssh/misc.py
--- 2.20.0-1/asyncssh/misc.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/misc.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2013-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -46,6 +46,17 @@ from .constants import DISC_NO_MORE_AUTH
 from .constants import DISC_PROTOCOL_ERROR, DISC_PROTOCOL_VERSION_NOT_SUPPORTED
 from .constants import DISC_SERVICE_NOT_AVAILABLE
 
+_pywin32_available = False
+
+if sys.platform == 'win32': # pragma: no cover
+    try:
+        import msvcrt
+        import win32file
+        import winioctlcon
+        _pywin32_available = True
+    except ImportError:
+        pass
+
 if sys.platform != 'win32': # pragma: no branch
     import fcntl
     import struct
@@ -305,6 +316,19 @@ def write_file(filename: FilePath, data:
         return f.write(data)
 
 
+if sys.platform == 'win32' and _pywin32_available: # pragma: no cover
+    def make_sparse_file(file_obj: IO) -> None:
+        """Enable sparse file support on a file on Windows"""
+
+        handle = msvcrt.get_osfhandle(file_obj.fileno())
+
+        win32file.DeviceIoControl(handle, winioctlcon.FSCTL_SET_SPARSE,
+                                  b'', 0, None)
+else:
+    def make_sparse_file(_file_obj: IO) -> None:
+        """Sparse files are automatically enabled on non-Windows systems"""
+
+
 def _parse_units(value: str, suffixes: Mapping[str, int], label: str) -> float:
     """Parse a series of integers followed by unit suffixes"""
 
diff -pruN 2.20.0-1/asyncssh/scp.py 2.21.0-1/asyncssh/scp.py
--- 2.20.0-1/asyncssh/scp.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/scp.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2017-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2017-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -24,8 +24,9 @@
 
 import argparse
 import asyncio
-import posixpath
+import inspect
 from pathlib import PurePath
+import posixpath
 import shlex
 import string
 import sys
@@ -40,9 +41,8 @@ from .logging import SSHLogger
 from .misc import BytesOrStr, FilePath, HostPort, MaybeAwait
 from .misc import async_context_manager, plural
 from .sftp import SFTPAttrs, SFTPGlob, SFTPName, SFTPServer, SFTPServerFS
-from .sftp import SFTPFileProtocol, SFTPError, SFTPFailure, SFTPBadMessage
-from .sftp import SFTPConnectionLost, SFTPErrorHandler, SFTPProgressHandler
-from .sftp import local_fs
+from .sftp import SFTPError, SFTPFailure, SFTPBadMessage, SFTPConnectionLost
+from .sftp import SFTPErrorHandler, SFTPProgressHandler, local_fs
 
 
 if TYPE_CHECKING:
@@ -60,6 +60,27 @@ _SCPConnPath = Union[Tuple[_SCPConn, _SC
 _SCP_BLOCK_SIZE = 256*1024    # 256 KiB
 
 
+class _SCPFileProtocol(Protocol):
+    """Protocol for accessing a file during an SCP copy"""
+
+    async def __aenter__(self) -> Self:
+        """Allow _SCPFileProtocol to be used as an async context manager"""
+
+    async def __aexit__(self, _exc_type: Optional[Type[BaseException]],
+                        _exc_value: Optional[BaseException],
+                        _traceback: Optional[TracebackType]) -> bool:
+        """Wait for file close when used as an async context manager"""
+
+    async def read(self, size: int, offset: int) -> bytes:
+        """Read data from the local file"""
+
+    async def write(self, data: bytes, offset: int) -> int:
+        """Write data to the local file"""
+
+    async def close(self) -> None:
+        """Close the local file"""
+
+
 class _SCPFSProtocol(Protocol):
     """Protocol for accessing a filesystem during an SCP copy"""
 
@@ -86,7 +107,7 @@ class _SCPFSProtocol(Protocol):
         """Create a directory"""
 
     @async_context_manager
-    async def open(self, path: bytes, mode: str) -> SFTPFileProtocol:
+    async def open(self, path: bytes, mode: str) -> _SCPFileProtocol:
         """Open a file"""
 
 
@@ -1066,22 +1087,46 @@ async def scp(srcpaths: Union[_SCPConnPa
             await dstconn.wait_closed()
 
 
-def run_scp_server(sftp_server: SFTPServer, command: str,
+async def _scp_handler(sftp_server: MaybeAwait[SFTPServer],
+                       args: _SCPArgs, reader: 'SSHReader[bytes]',
+                       writer: 'SSHWriter[bytes]') -> None:
+    """Run an SCP server to handle this request"""
+
+    if inspect.isawaitable(sftp_server):
+        sftp_server = await sftp_server
+
+    fs = SFTPServerFS(sftp_server)
+
+    handler: Union[_SCPSource, _SCPSink]
+
+    if args.source:
+        handler = _SCPSource(fs, reader, writer, args.preserve,
+                             args.recurse, error_handler=False, server=True)
+    else:
+        handler = _SCPSink(fs, reader, writer, args.must_be_dir,
+                           args.preserve, args.recurse,
+                           error_handler=False, server=True)
+
+    try:
+        await handler.run(args.path)
+    finally:
+        result = sftp_server.exit()
+
+        if inspect.isawaitable(result):
+            await result
+
+
+def run_scp_server(sftp_server: MaybeAwait[SFTPServer], command: str,
                    stdin: 'SSHReader[bytes]', stdout: 'SSHWriter[bytes]',
                    stderr: 'SSHWriter[bytes]') -> MaybeAwait[None]:
     """Return a handler for an SCP server session"""
 
-    async def _run_handler() -> None:
-        """Run an SCP server to handle this request"""
-
-        try:
-            await handler.run(args.path)
-        finally:
-            sftp_server.exit()
-
     try:
         args = _SCPArgParser().parse(command)
     except ValueError as exc:
+        if inspect.iscoroutine(sftp_server):
+            sftp_server.close()
+
         stdin.logger.info('Error starting SCP server: %s', str(exc))
         stderr.write(b'scp: ' + str(exc).encode('utf-8') + b'\n')
         cast('SSHServerChannel', stderr.channel).exit(1)
@@ -1089,15 +1134,4 @@ def run_scp_server(sftp_server: SFTPServ
 
     stdin.logger.info('Starting SCP server, args: %s', command[4:].strip())
 
-    fs = SFTPServerFS(sftp_server)
-
-    handler: Union[_SCPSource, _SCPSink]
-
-    if args.source:
-        handler = _SCPSource(fs, stdin, stdout, args.preserve, args.recurse,
-                             error_handler=False, server=True)
-    else:
-        handler = _SCPSink(fs, stdin, stdout, args.must_be_dir, args.preserve,
-                           args.recurse, error_handler=False, server=True)
-
-    return _run_handler()
+    return _scp_handler(sftp_server, args, stdin, stdout)
diff -pruN 2.20.0-1/asyncssh/server.py 2.21.0-1/asyncssh/server.py
--- 2.20.0-1/asyncssh/server.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/server.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2013-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -31,26 +31,36 @@ from .stream import SSHSocketSessionFact
 
 if TYPE_CHECKING:
     # pylint: disable=cyclic-import
-    from .connection import SSHServerConnection, SSHAcceptHandler
+    from .connection import SSHClientConnection, SSHServerConnection
+    from .connection import SSHAcceptHandler
     from .channel import SSHServerChannel, SSHTCPChannel, SSHUNIXChannel
     from .channel import SSHTunTapChannel
     from .session import SSHServerSession, SSHTCPSession, SSHUNIXSession
     from .session import SSHTunTapSession
 
 
-_NewSession = Union[bool, 'SSHServerSession', SSHServerSessionFactory,
-                    Tuple['SSHServerChannel', 'SSHServerSession'],
-                    Tuple['SSHServerChannel', SSHServerSessionFactory]]
-_NewTCPSession = Union[bool, 'SSHTCPSession', SSHSocketSessionFactory,
-                       Tuple['SSHTCPChannel', 'SSHTCPSession'],
-                       Tuple['SSHTCPChannel', SSHSocketSessionFactory]]
-_NewUNIXSession = Union[bool, 'SSHUNIXSession', SSHSocketSessionFactory,
-                        Tuple['SSHUNIXChannel', 'SSHUNIXSession'],
-                        Tuple['SSHUNIXChannel', SSHSocketSessionFactory]]
-_NewTunTapSession = Union[bool, 'SSHTunTapSession', SSHSocketSessionFactory,
-                          Tuple['SSHTunTapChannel', 'SSHTunTapSession'],
-                          Tuple['SSHTunTapChannel', SSHSocketSessionFactory]]
-_NewListener = Union[bool, 'SSHAcceptHandler', SSHListener]
+_NewSession = Union[
+    bool, 'SSHClientConnection',
+    MaybeAwait['SSHServerSession'], SSHServerSessionFactory,
+    Tuple['SSHServerChannel', MaybeAwait['SSHServerSession']],
+    Tuple['SSHServerChannel', SSHServerSessionFactory]]
+_NewTCPSession = Union[
+    bool, 'SSHClientConnection',
+    MaybeAwait['SSHTCPSession'], SSHSocketSessionFactory,
+    Tuple['SSHTCPChannel', MaybeAwait['SSHTCPSession']],
+    Tuple['SSHTCPChannel', SSHSocketSessionFactory]]
+_NewUNIXSession = Union[
+    bool, 'SSHClientConnection',
+    MaybeAwait['SSHUNIXSession'], SSHSocketSessionFactory,
+    Tuple['SSHUNIXChannel', MaybeAwait['SSHUNIXSession']],
+    Tuple['SSHUNIXChannel', SSHSocketSessionFactory]]
+_NewTunTapSession = Union[
+    bool, 'SSHClientConnection',
+    MaybeAwait['SSHTunTapSession'], SSHSocketSessionFactory,
+    Tuple['SSHTunTapChannel', MaybeAwait['SSHTunTapSession']],
+    Tuple['SSHTunTapChannel', SSHSocketSessionFactory]]
+_NewTCPListener = Union[bool, 'SSHAcceptHandler', MaybeAwait[SSHListener]]
+_NewUNIXListener = Union[bool, MaybeAwait[SSHListener]]
 
 
 class SSHServer:
@@ -157,7 +167,7 @@ class SSHServer:
 
         return True # pragma: no cover
 
-    def auth_completed(self) -> None:
+    def auth_completed(self) -> MaybeAwait[None]:
         """Authentication was completed successfully
 
            This method is called when authentication has completed
@@ -167,6 +177,9 @@ class SSHServer:
            user before any sessions are opened or forwarding requests
            are handled.
 
+           If blocking operations need to be performed when authentication
+           completes, this method may be defined as a coroutine.
+
         """
 
     def validate_gss_principal(self, username: str, user_principal: str,
@@ -742,6 +755,11 @@ class SSHServer:
            :exc:`ChannelOpenError` exception with the reason for
            the failure.
 
+           If the application wishes to tunnel the connection over
+           another SSH connection, this method should return an
+           :class:`SSHClientConnection` connected to the desired
+           tunnel host.
+
            If the application wishes to process the data on the
            connection itself, this method should return either an
            :class:`SSHTCPSession` object which can be used to process the
@@ -795,7 +813,7 @@ class SSHServer:
         return False # pragma: no cover
 
     def server_requested(self, listen_host: str,
-                         listen_port: int) -> MaybeAwait[_NewListener]:
+                         listen_port: int) -> MaybeAwait[_NewTCPListener]:
         """Handle a request to listen on a TCP/IP address and port
 
            This method is called when a client makes a request to
@@ -857,6 +875,11 @@ class SSHServer:
            :exc:`ChannelOpenError` exception with the reason for
            the failure.
 
+           If the application wishes to tunnel the connection over
+           another SSH connection, this method should return an
+           :class:`SSHClientConnection` connected to the desired
+           tunnel host.
+
            If the application wishes to process the data on the
            connection itself, this method should return either an
            :class:`SSHUNIXSession` object which can be used to process the
@@ -901,7 +924,7 @@ class SSHServer:
         return False # pragma: no cover
 
     def unix_server_requested(self, listen_path: str) -> \
-            MaybeAwait[_NewListener]:
+            MaybeAwait[_NewUNIXListener]:
         """Handle a request to listen on a UNIX domain socket
 
            This method is called when a client makes a request to
@@ -951,7 +974,7 @@ class SSHServer:
            by the server. Applications wishing to accept such tunnels must
            override this method.
 
-           To allow standard path forwarding of data on the connection to the
+           To allow standard forwarding of data on the connection to the
            requested TUN device, this method should return `True`.
 
            To reject this request, this method should return `False`
@@ -959,6 +982,11 @@ class SSHServer:
            :exc:`ChannelOpenError` exception with the reason for
            the failure.
 
+           If the application wishes to tunnel the data over another
+           SSH connection, this method should return an
+           :class:`SSHClientConnection` connected to the desired
+           tunnel host.
+
            If the application wishes to process the data on the
            connection itself, this method should return either an
            :class:`SSHTunTapSession` object which can be used to process the
@@ -1009,14 +1037,19 @@ class SSHServer:
            by the server. Applications wishing to accept such tunnels must
            override this method.
 
-           To allow standard path forwarding of data on the connection to the
-           requested TUN device, this method should return `True`.
+           To allow standard forwarding of data on the connection to the
+           requested TAP device, this method should return `True`.
 
            To reject this request, this method should return `False`
            to send back a "Connection refused" response or raise an
            :exc:`ChannelOpenError` exception with the reason for
            the failure.
 
+           If the application wishes to tunnel the data over another
+           SSH connection, this method should return an
+           :class:`SSHClientConnection` connected to the desired
+           tunnel host.
+
            If the application wishes to process the data on the
            connection itself, this method should return either an
            :class:`SSHTunTapSession` object which can be used to process the
diff -pruN 2.20.0-1/asyncssh/sftp.py 2.21.0-1/asyncssh/sftp.py
--- 2.20.0-1/asyncssh/sftp.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/sftp.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2015-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2015-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -106,13 +106,27 @@ from .logging import SSHLogger
 
 from .misc import BytesOrStr, Error, FilePath, MaybeAwait, OptExcInfo, Record
 from .misc import ConnectionLost
-from .misc import async_context_manager, get_symbol_names, hide_empty, plural
+from .misc import async_context_manager, get_symbol_names, hide_empty
+from .misc import make_sparse_file, plural
 
 from .packet import Boolean, Byte, String, UInt16, UInt32, UInt64
 from .packet import PacketDecodeError, SSHPacket, SSHPacketLogger
 
 from .version import __author__, __version__
 
+_pywin32_available = False
+
+if sys.platform == 'win32': # pragma: no cover
+    try:
+        import msvcrt
+        import pywintypes
+        import win32file
+        import winerror
+        import winioctlcon
+        _pywin32_available = True
+    except ImportError:
+        pass
+
 
 if TYPE_CHECKING:
     # pylint: disable=cyclic-import
@@ -167,6 +181,7 @@ _COPY_DATA_BLOCK_SIZE = 256*1024
 
 _MAX_SFTP_REQUESTS = 128
 _MAX_READDIR_NAMES = 128
+_MAX_SPARSE_RANGES = 128
 
 _NSECS_IN_SEC = 1_000_000_000
 
@@ -224,6 +239,10 @@ class SFTPFileProtocol(Protocol):
                         _traceback: Optional[TracebackType]) -> bool:
         """Wait for file close when used as an async context manager"""
 
+    def request_ranges(self, offset: int, length: int) -> \
+            AsyncIterator[Tuple[int, int]]:
+        """Return file ranges containing data"""
+
     async def read(self, size: int, offset: int) -> bytes:
         """Read data from the local file"""
 
@@ -624,6 +643,61 @@ def _setstat(path: Union[int, _SFTPPath]
             pass
 
 
+if sys.platform == 'win32' and _pywin32_available: # pragma: no cover
+    async def _request_ranges(file_obj: _SFTPFileObj, offset: int,
+                              length: int) -> AsyncIterator[Tuple[int, int]]:
+        """Return file ranges containing data on Windows"""
+
+        handle = msvcrt.get_osfhandle(file_obj.fileno())
+        bufsize = _MAX_SPARSE_RANGES * 16
+
+        while True:
+            try:
+                query_range = offset.to_bytes(8, 'little') + \
+                              length.to_bytes(8, 'little')
+
+                ranges = win32file.DeviceIoControl(
+                    handle, winioctlcon.FSCTL_QUERY_ALLOCATED_RANGES,
+                    query_range, bufsize, None)
+            except pywintypes.error as exc:
+                if exc.args[0] == winerror.ERROR_MORE_DATA:
+                    bufsize *= 2
+                else:
+                    raise
+            else:
+                break
+
+        for pos in range(0, len(ranges), 16):
+            offset = int.from_bytes(ranges[pos:pos+8], 'little')
+            length = int.from_bytes(ranges[pos+8:pos+16], 'little')
+            yield offset, length
+elif hasattr(os, 'SEEK_DATA'):
+    async def _request_ranges(file_obj: _SFTPFileObj, offset: int,
+                              length: int) -> AsyncIterator[Tuple[int, int]]:
+        """Return file ranges containing data"""
+
+        end = offset
+        limit = offset + length
+
+        try:
+            while end < limit:
+                start = file_obj.seek(end, os.SEEK_DATA)
+                end = min(file_obj.seek(start, os.SEEK_HOLE), limit)
+                yield start, end - start
+        except OSError as exc: # pragma: no cover
+            if exc.errno != errno.ENXIO:
+                raise
+else: # pragma: no cover
+    async def _request_ranges(file_obj: _SFTPFileObj, offset: int,
+                              length: int) -> AsyncIterator[Tuple[int, int]]:
+        """Sparse files aren't supported - return the full input range"""
+
+        # pylint: disable=unused-argument
+
+        if length:
+            yield offset, length
+
+
 class _SFTPParallelIO(Generic[_T]):
     """Parallelize I/O requests on files
 
@@ -767,11 +841,13 @@ class _SFTPFileCopier(_SFTPParallelIO[in
 
     """
 
-    def __init__(self, block_size: int, max_requests: int, offset: int,
-                 total_bytes: int, srcfs: _SFTPFSProtocol,
-                 dstfs: _SFTPFSProtocol, srcpath: bytes, dstpath: bytes,
+    def __init__(self, block_size: int, max_requests: int, total_bytes: int,
+                 sparse: bool, srcfs: _SFTPFSProtocol, dstfs: _SFTPFSProtocol,
+                 srcpath: bytes, dstpath: bytes,
                  progress_handler: SFTPProgressHandler):
-        super().__init__(block_size, max_requests, offset, total_bytes)
+        super().__init__(block_size, max_requests, 0, 0)
+
+        self._sparse = sparse
 
         self._srcfs = srcfs
         self._dstfs = dstfs
@@ -801,6 +877,12 @@ class _SFTPFileCopier(_SFTPParallelIO[in
     async def run(self) -> None:
         """Perform parallel file copy"""
 
+        async def _request_nonsparse_range(offset: int, length: int) -> \
+                AsyncIterator[Tuple[int, int]]:
+            """Return the entire file as the range to copy"""
+
+            yield offset, length
+
         try:
             self._src = await self._srcfs.open(self._srcpath, 'rb',
                                                block_size=0)
@@ -809,30 +891,39 @@ class _SFTPFileCopier(_SFTPParallelIO[in
 
             if self._progress_handler and self._total_bytes == 0:
                 self._progress_handler(self._srcpath, self._dstpath, 0, 0)
+                return
+
+            if self._sparse:
+                ranges = self._src.request_ranges(0, self._total_bytes)
+            else:
+                ranges = _request_nonsparse_range(0, self._total_bytes)
 
             if self._srcfs == self._dstfs and \
                     isinstance(self._srcfs, SFTPClient) and \
                     self._srcfs.supports_remote_copy:
-                await self._srcfs.remote_copy(cast(SFTPClientFile, self._src),
-                                              cast(SFTPClientFile, self._dst))
-
-                self._bytes_copied = self._total_bytes
-
-                if self._progress_handler:
-                    self._progress_handler(self._srcpath, self._dstpath,
-                                           self._bytes_copied,
-                                           self._total_bytes)
+                async for offset, length in ranges:
+                    await self._srcfs.remote_copy(
+                        cast(SFTPClientFile, self._src),
+                        cast(SFTPClientFile, self._dst),
+                        offset, length, offset)
+
+                    self._bytes_copied += length
+
+                    if self._progress_handler:
+                        self._progress_handler(self._srcpath, self._dstpath,
+                                               self._bytes_copied,
+                                               self._total_bytes)
             else:
-                async for _, datalen in self.iter():
-                    if datalen:
+                async for self._offset, self._bytes_left in ranges:
+                    async for _, datalen in self.iter():
                         self._bytes_copied += datalen
 
-                        if self._progress_handler:
+                        if self._progress_handler and datalen != 0:
                             self._progress_handler(self._srcpath, self._dstpath,
                                                    self._bytes_copied,
                                                    self._total_bytes)
 
-                if self._bytes_copied != self._total_bytes:
+                if self._bytes_copied != self._total_bytes and not self._sparse:
                     exc = SFTPFailure('Unexpected EOF during file copy')
 
                     setattr(exc, 'filename', self._srcpath)
@@ -2081,20 +2172,16 @@ class SFTPLimits(Record):
     max_write_len: int
     max_open_handles: int
 
-    def encode(self, sftp_version: int) -> bytes:
+    def encode(self) -> bytes:
         """Encode SFTP server limits in an SSH packet"""
 
-        # pylint: disable=unused-argument
-
         return (UInt64(self.max_packet_len) + UInt64(self.max_read_len) +
                 UInt64(self.max_write_len) + UInt64(self.max_open_handles))
 
     @classmethod
-    def decode(cls, packet: SSHPacket, sftp_version: int) -> 'SFTPLimits':
+    def decode(cls, packet: SSHPacket) -> Self:
         """Decode bytes in an SSH packet as SFTP server limits"""
 
-        # pylint: disable=unused-argument
-
         max_packet_len = packet.get_uint64()
         max_read_len = packet.get_uint64()
         max_write_len = packet.get_uint64()
@@ -2103,6 +2190,51 @@ class SFTPLimits(Record):
         return cls(max_packet_len, max_read_len,
                    max_write_len, max_open_handles)
 
+    def log(self, logger: SSHLogger, label: str) -> None:
+        """Log sending or receiving SFTP limits"""
+
+        logger.debug1('%s erver limits:', label)
+        logger.debug1('  Max packet len: %d', self.max_packet_len)
+        logger.debug1('  Max read len: %d', self.max_read_len)
+        logger.debug1('  Max write len: %d', self.max_write_len)
+        logger.debug1('  Max open handles: %d', self.max_open_handles)
+
+
+class SFTPRanges(Record):
+    """SFTP sparse file ranges"""
+
+    ranges: List[Tuple[int, int]]
+    at_end: bool
+
+    def encode(self) -> bytes:
+        """Encode sparse file ranges in an SSH packet"""
+
+        return (UInt32(len(self.ranges)) +
+                b''.join((UInt64(offset) + UInt64(length)
+                          for offset, length in self.ranges)) +
+                Boolean(self.at_end))
+
+    @classmethod
+    def decode(cls, packet: SSHPacket) -> Self:
+        """Decode bytes in an SSH packet as sparse file ranges"""
+
+        count = packet.get_uint32()
+        ranges = [(packet.get_uint64(), packet.get_uint64())
+                  for _ in range(count)]
+        at_end = packet.get_boolean()
+
+        return cls(ranges, at_end)
+
+    def log(self, logger: SSHLogger, label: str) -> None:
+        """Log sending or receiving sparse file ranges"""
+
+        logger.debug1('%s %s%s', label,
+                      plural(len(self.ranges), 'sparse file range'),
+                      ' (at end)' if self.at_end else '')
+
+        for offset, length in self.ranges:
+            logger.debug1('  offset %d, length %d', offset, length)
+
 
 class SFTPGlob:
     """SFTP glob matcher"""
@@ -2315,7 +2447,8 @@ class SFTPHandler(SSHPacketLogger):
         FXP_READLINK:             FXP_NAME,
         b'statvfs@openssh.com':   FXP_EXTENDED_REPLY,
         b'fstatvfs@openssh.com':  FXP_EXTENDED_REPLY,
-        b'limits@openssh.com':    FXP_EXTENDED_REPLY
+        b'limits@openssh.com':    FXP_EXTENDED_REPLY,
+        b'ranges@asyncssh.com':   FXP_EXTENDED_REPLY
     }
 
     def __init__(self, reader: 'SSHReader[bytes]', writer: 'SSHWriter[bytes]'):
@@ -2405,14 +2538,6 @@ class SFTPHandler(SSHPacketLogger):
                 self.logger.debug1('  %s%s%s', name,
                                    ': ' if data else '', data)
 
-    def _log_limits(self, limits: SFTPLimits) -> None:
-        """Log SFTP server limits"""
-
-        self.logger.debug1('  Max packet len: %d', limits.max_packet_len)
-        self.logger.debug1('  Max read len: %d', limits.max_read_len)
-        self.logger.debug1('  Max write len: %d', limits.max_write_len)
-        self.logger.debug1('  Max open handles: %d', limits.max_open_handles)
-
     async def _process_packet(self, pkttype: int, pktid: int,
                               packet: SSHPacket) -> None:
         """Abstract method for processing SFTP packets"""
@@ -2488,6 +2613,7 @@ class SFTPClientHandler(SFTPHandler):
         self._supports_lsetstat = False
         self._supports_limits = False
         self._supports_copy_data = False
+        self._supports_ranges = False
 
     @property
     def version(self) -> int:
@@ -2616,7 +2742,7 @@ class SFTPClientHandler(SFTPHandler):
         if self._version < 6:
             packet.check_end()
 
-        self.logger.debug1('Received %s%s', plural(len(names), 'name'),
+        self.logger.debug1('Received %s%s', plural(count, 'name'),
                            ' (at end)' if at_end else '')
 
         for name in names:
@@ -2716,6 +2842,8 @@ class SFTPClientHandler(SFTPHandler):
                 self._supports_limits = True
             elif name == b'copy-data' and data == b'1':
                 self._supports_copy_data = True
+            elif name == b'ranges@asyncssh.com' and data == b'1':
+                self._supports_ranges = True
 
         if version == 3:
             # Check if the server has a buggy SYMLINK implementation
@@ -2736,11 +2864,10 @@ class SFTPClientHandler(SFTPHandler):
             packet = cast(SSHPacket, await self._make_request(
                 b'limits@openssh.com'))
 
-            limits = SFTPLimits.decode(packet, self._version)
+            limits = SFTPLimits.decode(packet)
             packet.check_end()
 
-            self.logger.debug1('Received server limits:')
-            self._log_limits(limits)
+            limits.log(self.logger, 'Received')
 
             if limits.max_read_len:
                 self.limits.max_read_len = limits.max_read_len
@@ -3134,6 +3261,28 @@ class SFTPClientHandler(SFTPHandler):
         else:
             raise SFTPOpUnsupported('copy-data not supported')
 
+    async def request_ranges(self, handle: bytes, offset: int,
+                             length: int) -> SFTPRanges:
+        """Request file ranges containing data in a remote file"""
+
+        if self._supports_ranges:
+            self.logger.debug1('Sending ranges request for handle %s, '
+                               'offset %d, length %d', handle.hex(),
+                               offset, length)
+
+            packet = cast(SSHPacket, await self._make_request(
+                b'ranges@asyncssh.com', String(handle),
+                UInt64(offset), UInt64(length)))
+
+            result = SFTPRanges.decode(packet)
+            packet.check_end()
+
+            result.log(self.logger, 'Received')
+
+            return result
+        else:
+            return SFTPRanges([(offset, length)], True)
+
     def exit(self) -> None:
         """Handle a request to close the SFTP session"""
 
@@ -3209,6 +3358,35 @@ class SFTPClientFile:
         attrs = await self.stat()
         return attrs.size or 0
 
+    async def request_ranges(self, offset: int, length: int) -> \
+            AsyncIterator[Tuple[int, int]]:
+        """Return file ranges containing data in a remote file"""
+
+        next_offset = offset
+        next_length = length
+        end = offset + length
+        at_end = False
+
+        try:
+            while not at_end:
+                result = await self._handler.request_ranges(
+                    self.handle, next_offset, next_length)
+
+                if result.ranges:
+                    # pylint: disable=undefined-loop-variable
+
+                    for range_offset, range_length in result.ranges:
+                        yield range_offset, range_length
+
+                    next_offset = range_offset + range_length
+                    next_length = end - next_offset
+                else: # pragma: no cover
+                    break
+
+                at_end = result.at_end
+        except SFTPEOFError:
+            pass
+
     async def read(self, size: int = -1,
                    offset: Optional[int] = None) -> AnyStr:
         """Read data from the remote file
@@ -3765,7 +3943,7 @@ class SFTPClient:
     async def _copy(self, srcfs: _SFTPFSProtocol, dstfs: _SFTPFSProtocol,
                     srcpath: bytes, dstpath: bytes, srcattrs: SFTPAttrs,
                     preserve: bool, recurse: bool, follow_symlinks: bool,
-                    block_size: int, max_requests: int,
+                    sparse: bool, block_size: int, max_requests: int,
                     progress_handler: SFTPProgressHandler,
                     error_handler: SFTPErrorHandler,
                     remote_only: bool) -> None:
@@ -3803,9 +3981,9 @@ class SFTPClient:
 
                     await self._copy(srcfs, dstfs, srcfile, dstfile,
                                      srcname.attrs, preserve, recurse,
-                                     follow_symlinks, block_size, max_requests,
-                                     progress_handler, error_handler,
-                                     remote_only)
+                                     follow_symlinks, sparse, block_size,
+                                     max_requests, progress_handler,
+                                     error_handler, remote_only)
 
                 self.logger.info('  Finished copy of directory %s to %s',
                                  srcpath, dstpath)
@@ -3823,9 +4001,10 @@ class SFTPClient:
                 if remote_only and not self.supports_remote_copy:
                     raise SFTPOpUnsupported('Remote copy not supported')
 
-                await _SFTPFileCopier(block_size, max_requests, 0,
-                                      srcattrs.size or 0, srcfs, dstfs,
-                                      srcpath, dstpath, progress_handler).run()
+                await _SFTPFileCopier(block_size, max_requests,
+                                      srcattrs.size or 0, sparse,
+                                      srcfs, dstfs, srcpath, dstpath,
+                                      progress_handler).run()
 
             if preserve:
                 attrs = await srcfs.stat(srcpath,
@@ -3856,7 +4035,7 @@ class SFTPClient:
     async def _begin_copy(self, srcfs: _SFTPFSProtocol, dstfs: _SFTPFSProtocol,
                           srcpaths: _SFTPPaths, dstpath: Optional[_SFTPPath],
                           copy_type: str, expand_glob: bool, preserve: bool,
-                          recurse: bool, follow_symlinks: bool,
+                          recurse: bool, follow_symlinks: bool, sparse: bool,
                           block_size: int, max_requests: int,
                           progress_handler: SFTPProgressHandler,
                           error_handler: SFTPErrorHandler,
@@ -3919,15 +4098,15 @@ class SFTPClient:
                 dstfile = dstpath
 
             await self._copy(srcfs, dstfs, srcfile, dstfile, srcname.attrs,
-                             preserve, recurse, follow_symlinks, block_size,
-                             max_requests, progress_handler, error_handler,
-                             remote_only)
+                             preserve, recurse, follow_symlinks, sparse,
+                             block_size, max_requests, progress_handler,
+                             error_handler, remote_only)
 
     async def get(self, remotepaths: _SFTPPaths,
                   localpath: Optional[_SFTPPath] = None, *,
                   preserve: bool = False, recurse: bool = False,
-                  follow_symlinks: bool = False, block_size: int = -1,
-                  max_requests: int = -1,
+                  follow_symlinks: bool = False, sparse: bool = True,
+                  block_size: int = -1, max_requests: int = -1,
                   progress_handler: SFTPProgressHandler = None,
                   error_handler: SFTPErrorHandler = None) -> None:
         """Download remote files
@@ -4000,6 +4179,8 @@ class SFTPClient:
                Whether or not to recursively copy directories
            :param follow_symlinks: (optional)
                Whether or not to follow symbolic links
+           :param sparse: (optional)
+               Whether or not to do a sparse file copy where it is supported
            :param block_size: (optional)
                The block size to use for file reads and writes
            :param max_requests: (optional)
@@ -4016,6 +4197,7 @@ class SFTPClient:
            :type preserve: `bool`
            :type recurse: `bool`
            :type follow_symlinks: `bool`
+           :type sparse: `bool`
            :type block_size: `int`
            :type max_requests: `int`
            :type progress_handler: `callable`
@@ -4028,14 +4210,14 @@ class SFTPClient:
 
         await self._begin_copy(self, local_fs, remotepaths, localpath, 'get',
                                False, preserve, recurse, follow_symlinks,
-                               block_size, max_requests, progress_handler,
-                               error_handler)
+                               sparse, block_size, max_requests,
+                               progress_handler, error_handler)
 
     async def put(self, localpaths: _SFTPPaths,
                   remotepath: Optional[_SFTPPath] = None, *,
                   preserve: bool = False, recurse: bool = False,
-                  follow_symlinks: bool = False, block_size: int = -1,
-                  max_requests: int = -1,
+                  follow_symlinks: bool = False, sparse: bool = True,
+                  block_size: int = -1, max_requests: int = -1,
                   progress_handler: SFTPProgressHandler = None,
                   error_handler: SFTPErrorHandler = None) -> None:
         """Upload local files
@@ -4108,6 +4290,8 @@ class SFTPClient:
                Whether or not to recursively copy directories
            :param follow_symlinks: (optional)
                Whether or not to follow symbolic links
+           :param sparse: (optional)
+               Whether or not to do a sparse file copy where it is supported
            :param block_size: (optional)
                The block size to use for file reads and writes
            :param max_requests: (optional)
@@ -4124,6 +4308,7 @@ class SFTPClient:
            :type preserve: `bool`
            :type recurse: `bool`
            :type follow_symlinks: `bool`
+           :type sparse: `bool`
            :type block_size: `int`
            :type max_requests: `int`
            :type progress_handler: `callable`
@@ -4136,14 +4321,14 @@ class SFTPClient:
 
         await self._begin_copy(local_fs, self, localpaths, remotepath, 'put',
                                False, preserve, recurse, follow_symlinks,
-                               block_size, max_requests, progress_handler,
-                               error_handler)
+                               sparse, block_size, max_requests,
+                               progress_handler, error_handler)
 
     async def copy(self, srcpaths: _SFTPPaths,
                    dstpath: Optional[_SFTPPath] = None, *,
                    preserve: bool = False, recurse: bool = False,
-                   follow_symlinks: bool = False, block_size: int = -1,
-                   max_requests: int = -1,
+                   follow_symlinks: bool = False, sparse: bool = True,
+                   block_size: int = -1, max_requests: int = -1,
                    progress_handler: SFTPProgressHandler = None,
                    error_handler: SFTPErrorHandler = None,
                    remote_only: bool = False) -> None:
@@ -4217,6 +4402,8 @@ class SFTPClient:
                Whether or not to recursively copy directories
            :param follow_symlinks: (optional)
                Whether or not to follow symbolic links
+           :param sparse: (optional)
+               Whether or not to do a sparse file copy where it is supported
            :param block_size: (optional)
                The block size to use for file reads and writes
            :param max_requests: (optional)
@@ -4235,6 +4422,7 @@ class SFTPClient:
            :type preserve: `bool`
            :type recurse: `bool`
            :type follow_symlinks: `bool`
+           :type sparse: `bool`
            :type block_size: `int`
            :type max_requests: `int`
            :type progress_handler: `callable`
@@ -4248,14 +4436,14 @@ class SFTPClient:
 
         await self._begin_copy(self, self, srcpaths, dstpath, 'remote copy',
                                False, preserve, recurse, follow_symlinks,
-                               block_size, max_requests, progress_handler,
-                               error_handler, remote_only)
+                               sparse, block_size, max_requests,
+                               progress_handler, error_handler, remote_only)
 
     async def mget(self, remotepaths: _SFTPPaths,
                    localpath: Optional[_SFTPPath] = None, *,
                    preserve: bool = False, recurse: bool = False,
-                   follow_symlinks: bool = False, block_size: int = -1,
-                   max_requests: int = -1,
+                   follow_symlinks: bool = False, sparse: bool = True,
+                   block_size: int = -1, max_requests: int = -1,
                    progress_handler: SFTPProgressHandler = None,
                    error_handler: SFTPErrorHandler = None) -> None:
         """Download remote files with glob pattern match
@@ -4271,14 +4459,14 @@ class SFTPClient:
 
         await self._begin_copy(self, local_fs, remotepaths, localpath, 'mget',
                                True, preserve, recurse, follow_symlinks,
-                               block_size, max_requests, progress_handler,
-                               error_handler)
+                               sparse, block_size, max_requests,
+                               progress_handler, error_handler)
 
     async def mput(self, localpaths: _SFTPPaths,
                    remotepath: Optional[_SFTPPath] = None, *,
                    preserve: bool = False, recurse: bool = False,
-                   follow_symlinks: bool = False, block_size: int = -1,
-                   max_requests: int = -1,
+                   follow_symlinks: bool = False, sparse: bool = True,
+                   block_size: int = -1, max_requests: int = -1,
                    progress_handler: SFTPProgressHandler = None,
                    error_handler: SFTPErrorHandler = None) -> None:
         """Upload local files with glob pattern match
@@ -4294,14 +4482,14 @@ class SFTPClient:
 
         await self._begin_copy(local_fs, self, localpaths, remotepath, 'mput',
                                True, preserve, recurse, follow_symlinks,
-                               block_size, max_requests, progress_handler,
-                               error_handler)
+                               sparse, block_size, max_requests,
+                               progress_handler, error_handler)
 
     async def mcopy(self, srcpaths: _SFTPPaths,
                     dstpath: Optional[_SFTPPath] = None, *,
                     preserve: bool = False, recurse: bool = False,
-                    follow_symlinks: bool = False, block_size: int = -1,
-                    max_requests: int = -1,
+                    follow_symlinks: bool = False, sparse: bool = True,
+                    block_size: int = -1, max_requests: int = -1,
                     progress_handler: SFTPProgressHandler = None,
                     error_handler: SFTPErrorHandler = None,
                     remote_only: bool = False) -> None:
@@ -4318,8 +4506,8 @@ class SFTPClient:
 
         await self._begin_copy(self, self, srcpaths, dstpath, 'remote mcopy',
                                True, preserve, recurse, follow_symlinks,
-                               block_size, max_requests, progress_handler,
-                               error_handler, remote_only)
+                               sparse, block_size, max_requests,
+                               progress_handler, error_handler, remote_only)
 
     async def remote_copy(self, src: _SFTPClientFileOrPath,
                           dst: _SFTPClientFileOrPath, src_offset: int = 0,
@@ -5074,7 +5262,7 @@ class SFTPClient:
 
         """
 
-        return (await self._type(path)) != FILEXFER_TYPE_UNKNOWN
+        return await self._type(path) != FILEXFER_TYPE_UNKNOWN
 
     async def lexists(self, path: _SFTPPath) -> bool:
         """Return if the remote path exists, without following symbolic links
@@ -5087,7 +5275,7 @@ class SFTPClient:
 
         """
 
-        return (await self._type(path, statfunc=self.lstat)) != \
+        return await self._type(path, statfunc=self.lstat) != \
             FILEXFER_TYPE_UNKNOWN
 
     async def getatime(self, path: _SFTPPath) -> Optional[float]:
@@ -5216,7 +5404,7 @@ class SFTPClient:
 
         """
 
-        return (await self._type(path)) == FILEXFER_TYPE_DIRECTORY
+        return await self._type(path) == FILEXFER_TYPE_DIRECTORY
 
     async def isfile(self, path: _SFTPPath) -> bool:
         """Return if the remote path refers to a regular file
@@ -5229,7 +5417,7 @@ class SFTPClient:
 
         """
 
-        return (await self._type(path)) == FILEXFER_TYPE_REGULAR
+        return await self._type(path) == FILEXFER_TYPE_REGULAR
 
     async def islink(self, path: _SFTPPath) -> bool:
         """Return if the remote path refers to a symbolic link
@@ -5242,7 +5430,7 @@ class SFTPClient:
 
         """
 
-        return (await self._type(path, statfunc=self.lstat)) == \
+        return await self._type(path, statfunc=self.lstat) == \
             FILEXFER_TYPE_SYMLINK
 
     async def remove(self, path: _SFTPPath) -> None:
@@ -5713,7 +5901,8 @@ class SFTPServerHandler(SFTPHandler):
         (b'fsync@openssh.com', b'1'),
         (b'lsetstat@openssh.com', b'1'),
         (b'limits@openssh.com', b'1'),
-        (b'copy-data', b'1')]
+        (b'copy-data', b'1'),
+        (b'ranges@asyncssh.com', b'1')]
 
     _attrib_extensions: List[bytes] = []
 
@@ -5740,10 +5929,12 @@ class SFTPServerHandler(SFTPHandler):
                 result = self._server.close(file_obj)
 
                 if inspect.isawaitable(result):
-                    assert result is not None
                     await result
 
-            self._server.exit()
+            result = self._server.exit()
+
+            if inspect.isawaitable(result):
+                await result
 
             self._file_handles = {}
             self._dir_handles = {}
@@ -5816,9 +6007,11 @@ class SFTPServerHandler(SFTPHandler):
                             b''.join(name.encode(self._version)
                                      for name in names) + end)
             elif isinstance(result, SFTPLimits):
-                self.logger.debug1('Sending server limits:')
-                self._log_limits(result)
-                response = result.encode(self._version)
+                result.log(self.logger, 'Sending')
+                response = result.encode()
+            elif isinstance(result, SFTPRanges):
+                result.log(self.logger, 'Sending')
+                response = result.encode()
             else:
                 attrs: _SupportsEncode
 
@@ -5978,7 +6171,6 @@ class SFTPServerHandler(SFTPHandler):
             result = self._server.close(file_obj)
 
             if inspect.isawaitable(result):
-                assert result is not None
                 await result
 
             return
@@ -6117,7 +6309,6 @@ class SFTPServerHandler(SFTPHandler):
         result = self._server.setstat(path, attrs)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_fsetstat(self, packet: SSHPacket) -> None:
@@ -6138,7 +6329,6 @@ class SFTPServerHandler(SFTPHandler):
             result = self._server.fsetstat(file_obj, attrs)
 
             if inspect.isawaitable(result):
-                assert result is not None
                 await result
         else:
             raise SFTPInvalidHandle('Invalid file handle')
@@ -6178,7 +6368,6 @@ class SFTPServerHandler(SFTPHandler):
                     longname_result = self._server.format_longname(name)
 
                     if inspect.isawaitable(longname_result):
-                        assert longname_result is not None
                         await longname_result
 
                 result.append(name)
@@ -6207,7 +6396,6 @@ class SFTPServerHandler(SFTPHandler):
         result = self._server.remove(path)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_mkdir(self, packet: SSHPacket) -> None:
@@ -6224,7 +6412,6 @@ class SFTPServerHandler(SFTPHandler):
         result = self._server.mkdir(path, attrs)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_rmdir(self, packet: SSHPacket) -> None:
@@ -6240,7 +6427,6 @@ class SFTPServerHandler(SFTPHandler):
         result = self._server.rmdir(path)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_realpath(self, packet: SSHPacket) -> _SFTPNames:
@@ -6338,7 +6524,6 @@ class SFTPServerHandler(SFTPHandler):
             result = self._server.rename(oldpath, newpath)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_readlink(self, packet: SSHPacket) -> _SFTPNames:
@@ -6378,7 +6563,6 @@ class SFTPServerHandler(SFTPHandler):
         result = self._server.symlink(oldpath, newpath)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_link(self, packet: SSHPacket) -> None:
@@ -6400,7 +6584,6 @@ class SFTPServerHandler(SFTPHandler):
             result = self._server.link(oldpath, newpath)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_lock(self, packet: SSHPacket) -> None:
@@ -6422,7 +6605,6 @@ class SFTPServerHandler(SFTPHandler):
             result = self._server.lock(file_obj, offset, length, flags)
 
             if inspect.isawaitable(result): # pragma: no branch
-                assert result is not None
                 await result
         else:
             raise SFTPInvalidHandle('Invalid file handle')
@@ -6444,7 +6626,6 @@ class SFTPServerHandler(SFTPHandler):
             result = self._server.unlock(file_obj, offset, length)
 
             if inspect.isawaitable(result): # pragma: no branch
-                assert result is not None
                 await result
         else:
             raise SFTPInvalidHandle('Invalid file handle')
@@ -6462,7 +6643,6 @@ class SFTPServerHandler(SFTPHandler):
         result = self._server.posix_rename(oldpath, newpath)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_statvfs(self, packet: SSHPacket) -> _SFTPOSVFSAttrs:
@@ -6517,7 +6697,6 @@ class SFTPServerHandler(SFTPHandler):
         result = self._server.link(oldpath, newpath)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_fsync(self, packet: SSHPacket) -> None:
@@ -6534,7 +6713,6 @@ class SFTPServerHandler(SFTPHandler):
             result = self._server.fsync(file_obj)
 
             if inspect.isawaitable(result):
-                assert result is not None
                 await result
         else:
             raise SFTPInvalidHandle('Invalid file handle')
@@ -6554,7 +6732,6 @@ class SFTPServerHandler(SFTPHandler):
         result = self._server.lsetstat(path, attrs)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _process_limits(self, packet: SSHPacket) -> SFTPLimits:
@@ -6616,6 +6793,38 @@ class SFTPServerHandler(SFTPHandler):
         else:
             raise SFTPInvalidHandle('Invalid file handle')
 
+    async def _process_ranges(self, packet: SSHPacket) -> SFTPRanges:
+        """Process an incoming sparse file ranges request"""
+
+        handle = packet.get_string()
+        offset = packet.get_uint64()
+        length = packet.get_uint64()
+        packet.check_end()
+
+        self.logger.debug1('Received ranges request for handle %s, '
+                           'offset %d, length %d', handle.hex(),
+                           offset, length)
+
+        file_obj = cast(_SFTPFileObj, self._file_handles.get(handle))
+
+        if file_obj:
+            count = 0
+            result: List[Tuple[int, int]] = []
+
+            async for data_range in _request_ranges(file_obj, offset, length):
+                result.append(data_range)
+                count += 1
+
+                if count == _MAX_SPARSE_RANGES:
+                    break
+
+            if result:
+                return SFTPRanges(result, count < _MAX_SPARSE_RANGES)
+            else:
+                raise SFTPEOFError
+        else:
+            raise SFTPInvalidHandle('Invalid file handle')
+
     _packet_handlers: Dict[Union[int, bytes], _SFTPPacketHandler] = {
         FXP_OPEN:                     _process_open,
         FXP_CLOSE:                    _process_close,
@@ -6645,7 +6854,8 @@ class SFTPServerHandler(SFTPHandler):
         b'fsync@openssh.com':         _process_fsync,
         b'lsetstat@openssh.com':      _process_lsetstat,
         b'limits@openssh.com':        _process_limits,
-        b'copy-data':                 _process_copy_data
+        b'copy-data':                 _process_copy_data,
+        b'ranges@asyncssh.com':       _process_ranges
     }
 
     async def run(self) -> None:
@@ -7079,8 +7289,14 @@ class SFTPServer:
             pass
 
         perms = 0o666 if attrs.permissions is None else attrs.permissions
-        return open(_to_local_path(self.map_path(path)), mode, buffering=0,
-                    opener=lambda path, _: os.open(path, flags, perms))
+
+        file_obj = open(_to_local_path(self.map_path(path)), mode, buffering=0,
+                        opener=lambda path, _: os.open(path, flags, perms))
+
+        if mode[0] in 'wx':
+            make_sparse_file(file_obj)
+
+        return file_obj
 
     def open56(self, path: bytes, desired_access: int, flags: int,
                attrs: SFTPAttrs) -> MaybeAwait[object]:
@@ -7176,8 +7392,14 @@ class SFTPServer:
             pass
 
         perms = 0o666 if attrs.permissions is None else attrs.permissions
-        return open(_to_local_path(self.map_path(path)), mode, buffering=0,
-                    opener=lambda path, _: os.open(path, open_flags, perms))
+
+        file_obj = open(_to_local_path(self.map_path(path)), mode, buffering=0,
+                        opener=lambda path, _: os.open(path, open_flags, perms))
+
+        if mode[0] in 'wx':
+            make_sparse_file(file_obj)
+
+        return file_obj
 
     def close(self, file_obj: object) -> MaybeAwait[None]:
         """Close an open file or directory
@@ -7711,6 +7933,12 @@ class LocalFile:
         await self.close()
         return False
 
+    def request_ranges(self, offset: int, length: int) -> \
+            AsyncIterator[Tuple[int, int]]:
+        """Return data ranges containing data in a local file"""
+
+        return _request_ranges(self._file, offset, length)
+
     async def read(self, size: int, offset: int) -> bytes:
         """Read data from the local file"""
 
@@ -7827,7 +8055,13 @@ class LocalFS:
 
         # pylint: disable=unused-argument
 
-        return LocalFile(open(_to_local_path(path), mode))
+        file_obj = open(_to_local_path(path), mode)
+
+        if mode[0] in 'wx':
+            make_sparse_file(file_obj)
+
+        return LocalFile(file_obj)
+
 
 local_fs = LocalFS()
 
@@ -7883,7 +8117,6 @@ class SFTPServerFile:
         result = self._server.close(self._file_obj)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
 
@@ -7920,7 +8153,6 @@ class SFTPServerFS:
         result = self._server.setstat(path, attrs)
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     async def _type(self, path: bytes) -> int:
@@ -7939,12 +8171,12 @@ class SFTPServerFS:
     async def exists(self, path: bytes) -> bool:
         """Return if a path exists"""
 
-        return (await self._type(path)) != FILEXFER_TYPE_UNKNOWN
+        return await self._type(path) != FILEXFER_TYPE_UNKNOWN
 
     async def isdir(self, path: bytes) -> bool:
         """Return if the path refers to a directory"""
 
-        return (await self._type(path)) == FILEXFER_TYPE_DIRECTORY
+        return await self._type(path) == FILEXFER_TYPE_DIRECTORY
 
     def scandir(self, path: bytes) -> AsyncIterator[SFTPName]:
         """Return names and attributes of the files in a directory"""
@@ -7957,7 +8189,6 @@ class SFTPServerFS:
         result = self._server.mkdir(path, SFTPAttrs())
 
         if inspect.isawaitable(result):
-            assert result is not None
             await result
 
     @async_context_manager
@@ -7994,13 +8225,25 @@ async def start_sftp_client(conn: 'SSHCl
     return SFTPClient(handler, path_encoding, path_errors)
 
 
-def run_sftp_server(sftp_server: SFTPServer, reader: 'SSHReader[bytes]',
-                    writer: 'SSHWriter[bytes]',
-                    sftp_version: int) -> Awaitable[None]:
-    """Return a handler for an SFTP server session"""
+async def _sftp_handler(sftp_server: MaybeAwait[SFTPServer],
+                        reader: 'SSHReader[bytes]',
+                        writer: 'SSHWriter[bytes]',
+                        sftp_version: int) -> None:
+    """Run an SFTP server to handle this request"""
+
+    if inspect.isawaitable(sftp_server):
+        sftp_server = await sftp_server
 
     handler = SFTPServerHandler(sftp_server, reader, writer, sftp_version)
 
-    handler.logger.info('Starting SFTP server')
+    await handler.run()
+
+
+def run_sftp_server(sftp_server: MaybeAwait[SFTPServer],
+                    reader: 'SSHReader[bytes]', writer: 'SSHWriter[bytes]',
+                    sftp_version: int) -> Awaitable[None]:
+    """Return a handler for an SFTP server session"""
+
+    reader.logger.info('Starting SFTP server')
 
-    return handler.run()
+    return _sftp_handler(sftp_server, reader, writer, sftp_version)
diff -pruN 2.20.0-1/asyncssh/stream.py 2.21.0-1/asyncssh/stream.py
--- 2.20.0-1/asyncssh/stream.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/stream.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2013-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -62,7 +62,7 @@ SSHServerSessionFactory = Callable[['SSH
                                     'SSHWriter'], MaybeAwait[None]]
 _OptServerSessionFactory = Optional[SSHServerSessionFactory]
 
-SFTPServerFactory = Callable[['SSHChannel[bytes]'], SFTPServer]
+SFTPServerFactory = Callable[['SSHChannel[bytes]'], MaybeAwait[SFTPServer]]
 _OptSFTPServerFactory = Optional[SFTPServerFactory]
 
 
@@ -705,7 +705,7 @@ class SSHServerStreamSession(SSHStreamSe
         self._sftp_version = sftp_version
         self._allow_scp = allow_scp and bool(sftp_factory)
 
-    def _init_sftp_server(self) -> SFTPServer:
+    def _init_sftp_server(self) -> MaybeAwait[SFTPServer]:
         """Initialize an SFTP server for this stream to use"""
 
         assert self._chan is not None
@@ -771,7 +771,6 @@ class SSHServerStreamSession(SSHStreamSe
 
         if inspect.isawaitable(handler):
             assert self._conn is not None
-            assert handler is not None
             self._conn.create_task(handler, stdin.logger)
 
     def exception_received(self, exc: Exception) -> None:
@@ -824,7 +823,6 @@ class SSHSocketStreamSession(SSHStreamSe
 
             if inspect.isawaitable(handler):
                 assert self._conn is not None
-                assert handler is not None
                 self._conn.create_task(handler, reader.logger)
 
 
diff -pruN 2.20.0-1/asyncssh/version.py 2.21.0-1/asyncssh/version.py
--- 2.20.0-1/asyncssh/version.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/asyncssh/version.py	2025-05-03 13:32:15.000000000 +0000
@@ -26,4 +26,4 @@ __author_email__ = 'ronf@timeheart.net'
 
 __url__ = 'http://asyncssh.timeheart.net'
 
-__version__ = '2.20.0'
+__version__ = '2.21.0'
diff -pruN 2.20.0-1/debian/changelog 2.21.0-1/debian/changelog
--- 2.20.0-1/debian/changelog	2025-02-24 18:29:44.000000000 +0000
+++ 2.21.0-1/debian/changelog	2025-08-13 10:05:27.000000000 +0000
@@ -1,3 +1,10 @@
+python-asyncssh (2.21.0-1) unstable; urgency=medium
+
+  * Team upload.
+  * New upstream release.
+
+ -- Colin Watson <cjwatson@debian.org>  Wed, 13 Aug 2025 11:05:27 +0100
+
 python-asyncssh (2.20.0-1) unstable; urgency=medium
 
   * Team upload.
diff -pruN 2.20.0-1/debian/copyright 2.21.0-1/debian/copyright
--- 2.20.0-1/debian/copyright	2025-02-24 18:29:44.000000000 +0000
+++ 2.21.0-1/debian/copyright	2025-08-13 10:05:27.000000000 +0000
@@ -4,7 +4,7 @@ Upstream-Contact: Ron Frederick <ronf@ti
 Source: https://github.com/ronf/asyncssh
 
 Files: *
-Copyright: Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
+Copyright: Copyright (c) 2013-2025 by Ron Frederick <ronf@timeheart.net> and others.
 License: EPL-1.0
 
 Files: debian/*
diff -pruN 2.20.0-1/docs/changes.rst 2.21.0-1/docs/changes.rst
--- 2.20.0-1/docs/changes.rst	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/docs/changes.rst	2025-05-03 13:32:15.000000000 +0000
@@ -3,6 +3,44 @@
 Change Log
 ==========
 
+Release 2.21.0 (2 May 2025)
+---------------------------
+
+* Added sparse file support for SFTP, allowing file copying which
+  automatically skips over any "holes" in a source file, transferring
+  only the data ranges which are actually present.
+
+* Added support for applications to request that session, connection,
+  or TUN/TAP requests arriving on an SSHServerConnection be forwarded
+  out some other established SSHClientConnection. Callback methods on
+  SSHServer which decide how to handle these requests can now return
+  an SSHClientConnection to set up this tunneling, instead of having
+  to accept the request and implement their own forwarding logic.
+
+* Further hardened the SSH key exchange process to make AsyncSSH
+  more strict when accepting messages during key exchange. Thanks
+  go to Fabian Bäumer and Marcus Brinkmann for identifying potential
+  issues here.
+
+* Added support for the auth_completed callback in SSHServer to
+  be either a callable or a coroutine, allowing async operations
+  to be performed when user authentication completes successfully,
+  prior to accepting session requests.
+
+* Added support for the sftp_factory config argument be either a
+  callable or a coroutine, allowing async operations to be performed
+  when starting up a new SFTP server session.
+
+* Fixed a bug where the exit() method of SFTPServer didn't handle
+  being declared as a coroutine. Thanks go to C. R. Oldham for
+  reporting this issue.
+
+* Improved handling of exceptions in connection_lost() callbacks.
+  Exceptions in connection_lost() will now be reported in the
+  debug log, but other cleanup code in AsyncSSH will continue,
+  ignoring those exceptions. Thanks go to Danil Slinchuk for
+  reporting this issue.
+
 Release 2.20.0 (17 Feb 2025)
 ----------------------------
 
diff -pruN 2.20.0-1/tests/test_auth.py 2.21.0-1/tests/test_auth.py
--- 2.20.0-1/tests/test_auth.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_auth.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2015-2022 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2015-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -326,7 +326,7 @@ class _AuthServerStub(_AuthConnectionStu
         self.send_userauth_packet(MSG_USERAUTH_FAILURE, NameList([]),
                                   Boolean(partial_success))
 
-    def send_userauth_success(self):
+    async def send_userauth_success(self):
         """Send a user authentication success response"""
 
         self._auth = None
diff -pruN 2.20.0-1/tests/test_channel.py 2.21.0-1/tests/test_channel.py
--- 2.20.0-1/tests/test_channel.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_channel.py	2025-05-03 13:32:15.000000000 +0000
@@ -247,6 +247,15 @@ class _PTYServerSession(asyncssh.SSHServ
         chan.close()
 
 
+class _ClientSessionCleanupError(asyncssh.SSHClientSession):
+    """Test of exception during client session cleanup"""
+
+    def connection_lost(self, exc):
+        """Raise an error when a client session is cleaned up"""
+
+        raise RuntimeError('Exception in session cleanup test')
+
+
 class _ChannelServer(Server):
     """Server for testing the AsyncSSH channel API"""
 
@@ -1761,6 +1770,13 @@ class _TestChannel(ServerTestCase):
             await chan.wait_closed()
             self.assertEqual(session.exit_status, 255)
 
+    @asynctest
+    async def test_client_session_cleanup_error(self):
+        """Test error in client session cleanup"""
+
+        async with self.connect() as conn:
+            await conn.create_session(_ClientSessionCleanupError)
+
 
 class _TestChannelNoPTY(ServerTestCase):
     """Unit tests for AsyncSSH channel module with PTYs disallowed"""
diff -pruN 2.20.0-1/tests/test_connection.py 2.21.0-1/tests/test_connection.py
--- 2.20.0-1/tests/test_connection.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_connection.py	2025-05-03 13:32:15.000000000 +0000
@@ -282,6 +282,17 @@ class _InternalErrorClient(asyncssh.SSHC
         raise RuntimeError('Exception handler test')
 
 
+class _ClientCleanupError(asyncssh.SSHClient):
+    """Test of exception during client cleanup"""
+
+    def connection_lost(self, exc):
+        """Raise an error when a client is cleaned up"""
+
+        # pylint: disable=unused-argument
+
+        raise RuntimeError('Exception in cleanup test')
+
+
 class _TunnelServer(Server):
     """Allow forwarding to test server host key request tunneling"""
 
@@ -1719,6 +1730,13 @@ class _TestConnection(ServerTestCase):
         with self.assertRaises(RuntimeError):
             await self.create_connection(_InternalErrorClient)
 
+    @asynctest
+    async def test_client_cleanup_error(self):
+        """Test error in client cleanup"""
+
+        async with self.connect(client_factory=_ClientCleanupError):
+            pass
+
 
 @patch_extra_kex
 class _TestConnectionNoStrictKex(ServerTestCase):
diff -pruN 2.20.0-1/tests/test_connection_auth.py 2.21.0-1/tests/test_connection_auth.py
--- 2.20.0-1/tests/test_connection_auth.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_connection_auth.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2016-2022 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2016-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -75,6 +75,9 @@ class _NullServer(Server):
 
         return False
 
+    async def auth_completed(self):
+        """Handle client authentication request"""
+
 
 class _HostBasedServer(Server):
     """Server for testing host-based authentication"""
diff -pruN 2.20.0-1/tests/test_forward.py 2.21.0-1/tests/test_forward.py
--- 2.20.0-1/tests/test_forward.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_forward.py	2025-05-03 13:32:15.000000000 +0000
@@ -229,6 +229,25 @@ class _UNIXAsyncConnectionServer(_UNIXCo
             return listen_path != 'fail'
 
 
+class _UpstreamForwardingServer(Server):
+    """Server for testing forwarding between SSH connections"""
+
+    def __init__(self, upstream_conn):
+        super().__init__()
+
+        self._upstream_conn = upstream_conn
+
+    def connection_requested(self, dest_host, dest_port, orig_host, orig_port):
+        """Handle a request to create a new connection"""
+
+        return self._upstream_conn
+
+    def unix_connection_requested(self, dest_path):
+        """Handle a request to create a new UNIX domain connection"""
+
+        return self._upstream_conn
+
+
 class _CheckForwarding(ServerTestCase):
     """Utility functions for AsyncSSH forwarding unit tests"""
 
@@ -296,8 +315,8 @@ class _TestTCPForwarding(_CheckForwardin
     async def start_server(cls):
         """Start an SSH server which supports TCP connection forwarding"""
 
-        return (await cls.create_server(
-            _TCPConnectionServer, authorized_client_keys='authorized_keys'))
+        return await cls.create_server(
+            _TCPConnectionServer, authorized_client_keys='authorized_keys')
 
     async def _check_connection(self, conn, dest_host='',
                                 dest_port=7, **kwargs):
@@ -877,6 +896,25 @@ class _TestTCPForwarding(_CheckForwardin
                 self.assertEqual(pkttype, asyncssh.MSG_REQUEST_FAILURE)
 
     @asynctest
+    async def test_upstream_forward_local_port(self):
+        """Test upstream forwarding of a local port"""
+
+        def upstream_server():
+            """Return a server capable of forwarding between SSH connections"""
+
+            return _UpstreamForwardingServer(upstream_conn)
+
+        async with self.connect() as upstream_conn:
+            upstream_listener = await self.create_server(upstream_server)
+            upstream_port = upstream_listener.get_port()
+
+            async with self.connect('127.0.0.1', upstream_port) as conn:
+                async with conn.forward_local_port('', 0, '', 7) as listener:
+                    await self._check_local_connection(listener.get_port())
+
+            upstream_listener.close()
+
+    @asynctest
     async def test_add_channel_after_close(self):
         """Test opening a connection after a close"""
 
@@ -963,8 +1001,8 @@ class _TestUNIXForwarding(_CheckForwardi
     async def start_server(cls):
         """Start an SSH server which supports UNIX connection forwarding"""
 
-        return (await cls.create_server(
-            _UNIXConnectionServer, authorized_client_keys='authorized_keys'))
+        return await cls.create_server(
+            _UNIXConnectionServer, authorized_client_keys='authorized_keys')
 
     async def _check_unix_connection(self, conn, dest_path='/echo', **kwargs):
         """Open a UNIX connection and test if an input line is echoed back"""
@@ -1233,6 +1271,25 @@ class _TestUNIXForwarding(_CheckForwardi
 
                 self.assertEqual(pkttype, asyncssh.MSG_REQUEST_FAILURE)
 
+    @asynctest
+    async def test_upstream_forward_local_path(self):
+        """Test upstream forwarding of a local path"""
+
+        def upstream_server():
+            """Return a server capable of forwarding between SSH connections"""
+
+            return _UpstreamForwardingServer(upstream_conn)
+
+        async with self.connect() as upstream_conn:
+            upstream_listener = await self.create_server(upstream_server)
+            upstream_port = upstream_listener.get_port()
+
+            async with self.connect('127.0.0.1', upstream_port) as conn:
+                async with conn.forward_local_path('local', '/echo'):
+                    await self._check_local_unix_connection('local')
+
+            upstream_listener.close()
+
 
 class _TestAsyncUNIXForwarding(_TestUNIXForwarding):
     """Unit tests for AsyncSSH UNIX connection forwarding with async return"""
@@ -1253,8 +1310,8 @@ class _TestSOCKSForwarding(_CheckForward
     async def start_server(cls):
         """Start an SSH server which supports TCP connection forwarding"""
 
-        return (await cls.create_server(
-            _TCPConnectionServer, authorized_client_keys='authorized_keys'))
+        return await cls.create_server(
+            _TCPConnectionServer, authorized_client_keys='authorized_keys')
 
     async def _check_early_error(self, reader, writer, data):
         """Check errors in the initial SOCKS message"""
diff -pruN 2.20.0-1/tests/test_kex.py 2.21.0-1/tests/test_kex.py
--- 2.20.0-1/tests/test_kex.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_kex.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2015-2020 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2015-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -35,8 +35,8 @@ from asyncssh.kex_dh import MSG_KEXDH_IN
 from asyncssh.kex_dh import MSG_KEX_DH_GEX_REQUEST, MSG_KEX_DH_GEX_GROUP
 from asyncssh.kex_dh import MSG_KEX_DH_GEX_INIT, MSG_KEX_DH_GEX_REPLY, _KexDHGex
 from asyncssh.kex_dh import MSG_KEX_ECDH_INIT, MSG_KEX_ECDH_REPLY
-from asyncssh.kex_dh import MSG_KEXGSS_INIT, MSG_KEXGSS_COMPLETE
-from asyncssh.kex_dh import MSG_KEXGSS_ERROR
+from asyncssh.kex_dh import MSG_KEXGSS_INIT, MSG_KEXGSS_HOSTKEY
+from asyncssh.kex_dh import MSG_KEXGSS_COMPLETE, MSG_KEXGSS_ERROR
 from asyncssh.kex_rsa import MSG_KEXRSA_PUBKEY, MSG_KEXRSA_SECRET
 from asyncssh.kex_rsa import MSG_KEXRSA_DONE
 from asyncssh.gss import GSSClient, GSSServer
@@ -51,12 +51,13 @@ from .util import AsyncTestCase, Connect
 class _KexConnectionStub(ConnectionStub):
     """Connection stub class to test key exchange"""
 
-    def __init__(self, alg, gss, peer, server=False):
+    def __init__(self, alg, gss, duplicate=0, peer=None, server=False):
         super().__init__(peer, server)
 
         self._gss = gss
         self._key_waiter = asyncio.Future()
 
+        self._duplicate = duplicate
         self._kex = get_kex(self, alg)
 
     async def start(self):
@@ -104,6 +105,14 @@ class _KexConnectionStub(ConnectionStub)
 
         return self._gss
 
+    def send_packet(self, pkttype, *args, **kwargs):
+        """Duplicate sending packets of a specific type"""
+
+        super().send_packet(pkttype, *args)
+
+        if pkttype == self._duplicate:
+            super().send_packet(pkttype, *args, **kwargs)
+
     async def simulate_dh_init(self, e):
         """Simulate receiving a DH init packet"""
 
@@ -176,21 +185,21 @@ class _KexClientStub(_KexConnectionStub)
     """Stub class for client connection"""
 
     @classmethod
-    def make_pair(cls, alg, gss_host=None):
+    def make_pair(cls, alg, gss_host=None, duplicate=0):
         """Make a client and server connection pair to test key exchange"""
 
-        client_conn = cls(alg, gss_host)
+        client_conn = cls(alg, gss_host, duplicate)
         return client_conn, client_conn.get_peer()
 
-    def __init__(self, alg, gss_host):
-        server_conn = _KexServerStub(alg, gss_host, self)
+    def __init__(self, alg, gss_host, duplicate):
+        server_conn = _KexServerStub(alg, gss_host, duplicate, peer=self)
 
         if gss_host:
             gss = GSSClient(gss_host, None, 'delegate' in gss_host)
         else:
             gss = None
 
-        super().__init__(alg, gss, server_conn)
+        super().__init__(alg, gss, duplicate, peer=server_conn)
 
     def connection_lost(self, exc):
         """Handle the closing of a connection"""
@@ -211,9 +220,9 @@ class _KexClientStub(_KexConnectionStub)
 class _KexServerStub(_KexConnectionStub):
     """Stub class for server connection"""
 
-    def __init__(self, alg, gss_host, peer):
+    def __init__(self, alg, gss_host, duplicate, peer):
         gss = GSSServer(gss_host, None) if gss_host else None
-        super().__init__(alg, gss, peer, True)
+        super().__init__(alg, gss, duplicate, peer, True)
 
         if gss_host and 'no_host_key' in gss_host:
             self._server_host_key = None
@@ -381,6 +390,21 @@ class _TestKex(AsyncTestCase):
         client_conn.close()
         server_conn.close()
 
+    @asynctest
+    async def test_dh_gex_multiple_messages(self):
+        """Unit test duplicate messages in DH group exchange"""
+
+        for pkttype in (MSG_KEX_DH_GEX_REQUEST, MSG_KEX_DH_GEX_GROUP):
+            client_conn, server_conn = _KexClientStub.make_pair(
+                b'diffie-hellman-group-exchange-sha1', duplicate=pkttype)
+
+            with self.assertRaises(asyncssh.ProtocolError):
+                await client_conn.start()
+                await client_conn.get_key()
+
+            client_conn.close()
+            server_conn.close()
+
     @unittest.skipUnless(gss_available, 'GSS not available')
     @asynctest
     async def test_gss_errors(self):
@@ -393,6 +417,15 @@ class _TestKex(AsyncTestCase):
             with self.assertRaises(asyncssh.ProtocolError):
                 await client_conn.process_packet(Byte(MSG_KEXGSS_INIT))
 
+        with self.subTest('Host key sent to server'):
+            with self.assertRaises(asyncssh.ProtocolError):
+                await server_conn.process_packet(Byte(MSG_KEXGSS_HOSTKEY))
+
+        with self.subTest('Host key sent twice to client'):
+            with self.assertRaises(asyncssh.ProtocolError):
+                await client_conn.process_packet(Byte(MSG_KEXGSS_HOSTKEY))
+                await client_conn.process_packet(Byte(MSG_KEXGSS_HOSTKEY))
+
         with self.subTest('Complete sent to server'):
             with self.assertRaises(asyncssh.ProtocolError):
                 await server_conn.process_packet(Byte(MSG_KEXGSS_COMPLETE))
diff -pruN 2.20.0-1/tests/test_sftp.py 2.21.0-1/tests/test_sftp.py
--- 2.20.0-1/tests/test_sftp.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_sftp.py	2025-05-03 13:32:15.000000000 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2015-2024 by Ron Frederick <ronf@timeheart.net> and others.
+# Copyright (c) 2015-2025 by Ron Frederick <ronf@timeheart.net> and others.
 #
 # This program and the accompanying materials are made available under
 # the terms of the Eclipse Public License v2.0 which accompanies this
@@ -67,6 +67,8 @@ from asyncssh import FILEXFER_TYPE_FIFO
 from asyncssh import FILEXFER_ATTR_BITS_READONLY, FILEXFER_ATTR_KNOWN_TEXT
 from asyncssh import FX_OK, scp
 
+from asyncssh.misc import make_sparse_file
+
 from asyncssh.packet import SSHPacket, String, UInt32
 
 from asyncssh.sftp import SAFE_SFTP_READ_LEN, SAFE_SFTP_WRITE_LEN
@@ -615,6 +617,11 @@ class _AsyncSFTPServer(SFTPServer):
 
         super().fsync(file_obj)
 
+    async def exit(self):
+        """Shut down this SFTP server"""
+
+        super().exit()
+
 
 class _CheckSFTP(ServerTestCase):
     """Utility functions for AsyncSSH SFTP unit tests"""
@@ -632,7 +639,7 @@ class _CheckSFTP(ServerTestCase):
         except OSError: # pragma: no cover
             cls._symlink_supported = False
 
-    def _create_file(self, name, data=(), mode=None, utime=None):
+    def _create_file(self, name, data=(), offsets=(0,), mode=None, utime=None):
         """Create a test file"""
 
         if data == ():
@@ -641,7 +648,11 @@ class _CheckSFTP(ServerTestCase):
         binary = 'b' if isinstance(data, bytes) else ''
 
         with open(name, 'w' + binary) as f:
-            f.write(data)
+            make_sparse_file(f)
+
+            for offset in offsets:
+                f.seek(offset)
+                f.write(data)
 
         if mode is not None:
             os.chmod(name, mode)
@@ -659,6 +670,7 @@ class _CheckSFTP(ServerTestCase):
 
         self.assertEqual(stat.S_IMODE(attrs1.st_mode),
                          stat.S_IMODE(attrs2.st_mode))
+        self.assertEqual(attrs1.st_size, attrs2.st_size)
         self.assertEqual(int(attrs1.st_mtime), int(attrs2.st_mtime))
 
         if check_atime:
@@ -675,6 +687,22 @@ class _CheckSFTP(ServerTestCase):
             with open(name2, 'rb') as file2:
                 self.assertEqual(file1.read(), file2.read())
 
+    async def _check_sparse_file(self, name1, name2):
+        """Check if two sparse files are equal"""
+
+        size1 = os.stat(name1).st_size
+        size2 = os.stat(name2).st_size
+        self.assertEqual(size1, size2)
+
+        with open(name1, 'rb') as file1:
+            with open(name2, 'rb') as file2:
+                ranges1 = [range async for range in
+                           LocalFile(file1).request_ranges(0, size1)]
+                ranges2 = [range async for range in
+                           LocalFile(file2).request_ranges(0, size2)]
+
+                self.assertEqual(ranges1, ranges2)
+
     def _check_stat(self, sftp_stat, local_stat):
         """Check if file attributes are equal"""
 
@@ -754,6 +782,47 @@ class _TestSFTP(_CheckSFTP):
                         remove('src dst')
 
     @sftp_test
+    async def test_sparse_copy(self, sftp):
+        """Test putting a sparse file over SFTP"""
+
+        for method in ('get', 'put', 'copy'):
+            with self.subTest(method=method):
+                try:
+                    self._create_file(
+                        'src', offsets=(i*1024*1024 for i in
+                                        range(24, 3840, 24)))
+                    await getattr(sftp, method)('src', 'dst')
+                    await self._check_sparse_file('src', 'dst')
+                finally:
+                    remove('src dst')
+
+    @sftp_test
+    async def test_empty_request_range(self, sftp):
+        """Test getting ranges from an empty file"""
+
+        try:
+            self._create_file('file', data=b'')
+
+            async with sftp.open('file', 'rb') as f:
+                result = [data_range async for data_range in
+                          f.request_ranges(0, 0)]
+                self.assertEqual(result, [])
+        finally:
+            remove('file')
+
+    @sftp_test
+    async def test_nonsparse_put(self, sftp):
+        """Test putting a sparse file over SFTP with sparse mode disabled"""
+
+        try:
+            self._create_file(
+                'src', offsets=(i*1024*1024 for i in range(24, 72, 24)))
+            await sftp.put('src', 'dst', sparse=False)
+            self._check_file('src', 'dst')
+        finally:
+            remove('src dst')
+
+    @sftp_test
     async def test_copy_max_requests(self, sftp):
         """Test copying a file over SFTP with max requests set"""
 
@@ -779,7 +848,7 @@ class _TestSFTP(_CheckSFTP):
                 with self.subTest(method=method):
                     try:
                         self._create_file('src')
-                        await sftp.copy('src', 'dst')
+                        await getattr(sftp, method)('src', 'dst')
                         self._check_file('src', 'dst')
                     finally:
                         remove('src dst')
@@ -3283,6 +3352,9 @@ class _TestSFTP(_CheckSFTP):
             f = await sftp.open('file')
 
             with self.assertRaises(SFTPFailure):
+                _ = [_ async for _ in f.request_ranges(0, 0)]
+
+            with self.assertRaises(SFTPFailure):
                 await f.read()
 
             with self.assertRaises(SFTPFailure):
@@ -4149,10 +4221,33 @@ class _TestSFTPCallable(_CheckSFTP):
     async def start_server(cls):
         """Start an SFTP server using a callable"""
 
-        def sftp_factory(conn):
+        def sftp_factory(chan):
+            """Return an SFTP server"""
+
+            return SFTPServer(chan)
+
+        return await cls.create_server(sftp_factory=sftp_factory)
+
+    @sftp_test
+    async def test_stat(self, sftp):
+        """Test getting attributes on a file"""
+
+        # pylint: disable=no-self-use
+
+        await sftp.stat('.')
+
+
+class _TestSFTPCoroutine(_CheckSFTP):
+    """Unit tests for AsyncSSH SFTP factory being a coroutine"""
+
+    @classmethod
+    async def start_server(cls):
+        """Start an SFTP server using a coroutine"""
+
+        async def sftp_factory(chan):
             """Return an SFTP server"""
 
-            return SFTPServer(conn)
+            return SFTPServer(chan)
 
         return await cls.create_server(sftp_factory=sftp_factory)
 
@@ -4537,7 +4632,7 @@ class _TestSFTPEOFDuringCopy(_CheckSFTP)
             self._create_file('src', 8*1024*1024*'\0')
 
             with self.assertRaises(SFTPFailure):
-                await sftp.get('src', 'dst')
+                await sftp.get('src', 'dst', sparse=False)
         finally:
             remove('src dst')
 
@@ -5501,6 +5596,22 @@ class _TestSCPAsync(_TestSCP):
                                        allow_scp=True)
 
 
+class _TestSCPCoroutine(_TestSCP):
+    """Unit test for AsyncSSH SCP with the SFTP factory being a coroutine"""
+
+    @classmethod
+    async def start_server(cls):
+        """Start an SFTP server with async callbacks"""
+
+        async def sftp_factory(chan):
+            """Return an SFTP server"""
+
+            return SFTPServer(chan)
+
+        return await cls.create_server(sftp_factory=sftp_factory,
+                                       allow_scp=True)
+
+
 class _TestSCPAttrs(_CheckSCP):
     """Unit test for SCP with SFTP server returning SFTPAttrs"""
 
diff -pruN 2.20.0-1/tests/test_stream.py 2.21.0-1/tests/test_stream.py
--- 2.20.0-1/tests/test_stream.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_stream.py	2025-05-03 13:32:15.000000000 +0000
@@ -95,6 +95,20 @@ class _StreamServer(Server):
             return False
 
 
+class _UpstreamForwardingServer(Server):
+    """Server for testing forwarding between SSH connections"""
+
+    def __init__(self, upstream_conn):
+        super().__init__()
+
+        self._upstream_conn = upstream_conn
+
+    def session_requested(self):
+        """Handle a request to create a new session"""
+
+        return self._upstream_conn
+
+
 class _TestStream(ServerTestCase):
     """Unit tests for AsyncSSH stream API"""
 
@@ -129,9 +143,9 @@ class _TestStream(ServerTestCase):
         self.assertEqual(data, stdout_data)
         self.assertEqual(data, stderr_data)
 
-        await stdin.channel.wait_closed()
         await stdin.drain()
         stdin.close()
+        await stdin.channel.wait_closed()
 
 
     @asynctest
@@ -142,6 +156,24 @@ class _TestStream(ServerTestCase):
             await self._check_session(conn)
 
     @asynctest
+    async def test_upstream_shell(self):
+        """Test upstream forwarding of a shell request"""
+
+        def upstream_server():
+            """Return a server capable of forwarding between SSH connections"""
+
+            return _UpstreamForwardingServer(upstream_conn)
+
+        async with self.connect() as upstream_conn:
+            upstream_listener = await self.create_server(upstream_server)
+            upstream_port = upstream_listener.get_port()
+
+            async with self.connect('127.0.0.1', upstream_port) as conn:
+                await self._check_session(conn)
+
+            upstream_listener.close()
+
+    @asynctest
     async def test_shell_failure(self):
         """Test failure to start a shell"""
 
diff -pruN 2.20.0-1/tests/test_tuntap.py 2.21.0-1/tests/test_tuntap.py
--- 2.20.0-1/tests/test_tuntap.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/test_tuntap.py	2025-05-03 13:32:15.000000000 +0000
@@ -368,7 +368,29 @@ class _TunTapServer(Server):
     def tap_requested(self, unit):
         """Handle TAP requests"""
 
-        return True
+        if unit == 33:
+            return _EchoSession()
+        else:
+            return True
+
+
+class _UpstreamForwardingServer(Server):
+    """Server for testing forwarding between SSH connections"""
+
+    def __init__(self, upstream_conn):
+        super().__init__()
+
+        self._upstream_conn = upstream_conn
+
+    def tun_requested(self, unit):
+        """Handle a request to create a new layer 3 tunnel"""
+
+        return self._upstream_conn
+
+    def tap_requested(self, unit):
+        """Handle a request to create a new layer 2 tunnel"""
+
+        return self._upstream_conn
 
 
 @skipIf(sys.platform == 'win32', 'skip TUN/TAP tests on Windows')
@@ -659,6 +681,42 @@ class _TestTunTap(ServerTestCase):
             await self._check_tuntap_echo(conn.open_tun(33))
 
     @asynctest
+    async def test_upstream_open_tun_echo_session(self):
+        """Test an echo session on a forwarded layer 3 tunnel"""
+
+        def upstream_server():
+            """Return a server capable of forwarding between SSH connections"""
+
+            return _UpstreamForwardingServer(upstream_conn)
+
+        async with self.connect() as upstream_conn:
+            upstream_listener = await self.create_server(upstream_server)
+            upstream_port = upstream_listener.get_port()
+
+            async with self.connect('127.0.0.1', upstream_port) as conn:
+                await self._check_tuntap_echo(conn.open_tun(33))
+
+            upstream_listener.close()
+
+    @asynctest
+    async def test_upstream_open_tap_echo_session(self):
+        """Test an echo session on a forwarded layer 2 tunnel"""
+
+        def upstream_server():
+            """Return a server capable of forwarding between SSH connections"""
+
+            return _UpstreamForwardingServer(upstream_conn)
+
+        async with self.connect() as upstream_conn:
+            upstream_listener = await self.create_server(upstream_server)
+            upstream_port = upstream_listener.get_port()
+
+            async with self.connect('127.0.0.1', upstream_port) as conn:
+                await self._check_tuntap_echo(conn.open_tap(33))
+
+            upstream_listener.close()
+
+    @asynctest
     async def test_open_tun_echo_session_channel(self):
         """Test an echo session & channel on a layer 3 tunnel"""
 
diff -pruN 2.20.0-1/tests/util.py 2.21.0-1/tests/util.py
--- 2.20.0-1/tests/util.py	2025-02-17 18:29:16.000000000 +0000
+++ 2.21.0-1/tests/util.py	2025-05-03 13:32:15.000000000 +0000
@@ -448,8 +448,6 @@ class AsyncTestCase(TempDirTestCase):
         else:
             cls.loop = asyncio.new_event_loop()
 
-        asyncio.set_event_loop(cls.loop)
-
         try:
             cls.loop.run_until_complete(cls.asyncSetUpClass())
         except AttributeError:
