Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions src/dstack/_internal/cli/commands/attach.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
print_finished_message,
)
from dstack._internal.cli.utils.common import console, get_start_time
from dstack._internal.cli.utils.rich import MultiItemStatus
from dstack._internal.cli.utils.run import get_runs_table
from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT
from dstack._internal.core.errors import CLIError
from dstack._internal.core.models.runs import RunStatus
from dstack._internal.core.services.ssh.ports import PortUsedError
from dstack._internal.utils.common import get_or_error
from dstack.api._public.runs import Run

Expand Down Expand Up @@ -76,15 +80,39 @@ def _command(self, args: argparse.Namespace):
run = self.api.runs.get(args.run_name)
if run is None:
raise CLIError(f"Run {args.run_name} not found")

# Show live progress while waiting for the run to be ready
if _is_provisioning(run):
with MultiItemStatus(f"Attaching to [code]{run.name}[/]...", console=console) as live:
while _is_provisioning(run):
live.update(get_runs_table([run]))
time.sleep(5)
run.refresh()
console.print(get_runs_table([run], verbose=run.status == RunStatus.FAILED))
console.print(
f"\nProvisioning [code]{run.name}[/] completed [secondary]({run.status.value})[/]"
)

if run.status.is_finished() and run.status != RunStatus.DONE:
raise CLIError(f"Run {args.run_name} is {run.status.value}")

exit_code = 0
try:
attached = run.attach(
ssh_identity_file=args.ssh_identity_file,
bind_address=args.host,
ports_overrides=args.ports,
replica_num=args.replica,
job_num=args.job,
)
try:
attached = run.attach(
ssh_identity_file=args.ssh_identity_file,
bind_address=args.host,
ports_overrides=args.ports,
replica_num=args.replica,
job_num=args.job,
)
except PortUsedError as e:
console.print(
f"[error]Failed to attach: port [code]{e.port}[/code] is already in use."
f" Use [code]-p[/code] in [code]dstack attach[/code] to override the local"
f" port mapping, e.g. [code]-p {e.port + 1}:{e.port}[/code].[/]"
)
exit(1)
if not attached:
raise CLIError(f"Failed to attach to run {args.run_name}")
_print_attached_message(
Expand Down Expand Up @@ -159,3 +187,11 @@ def _print_attached_message(
output += f"To connect to the run via SSH, use `ssh {name}`.\n"
output += "Press Ctrl+C to detach..."
console.print(output)


def _is_provisioning(run: Run) -> bool:
return run.status in (
RunStatus.SUBMITTED,
RunStatus.PENDING,
RunStatus.PROVISIONING,
)
19 changes: 18 additions & 1 deletion src/dstack/_internal/cli/services/configurators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
InvalidRepoCredentialsError,
get_repo_creds_and_default_branch,
)
from dstack._internal.core.services.ssh.ports import PortUsedError
from dstack._internal.utils.common import local_time
from dstack._internal.utils.interpolator import InterpolatorError, VariablesInterpolator
from dstack._internal.utils.logging import get_logger
Expand Down Expand Up @@ -168,6 +169,13 @@ def apply_configuration(
)
except ServerClientError as e:
raise CLIError(e.msg)
except PortUsedError as e:
console.print(
f"[error]Failed to submit: port [code]{e.port}[/code] is already in use."
f" Use [code]-p[/code] in [code]dstack apply[/code] to override the local"
f" port mapping, e.g. [code]-p {e.port + 1}:{e.port}[/code].[/]"
)
exit(1)

if command_args.detach:
detach_message = f"Run [code]{run.name}[/] submitted, detaching..."
Expand Down Expand Up @@ -206,7 +214,16 @@ def apply_configuration(
configurator_args, _BIND_ADDRESS_ARG, None
)
try:
if run.attach(bind_address=bind_address):
try:
attached = run.attach(bind_address=bind_address)
except PortUsedError as e:
console.print(
f"[error]Failed to attach: port [code]{e.port}[/code] is already in use."
f" Use [code]-p[/code] in [code]dstack attach[/code] to override the local"
f" port mapping, e.g. [code]-p {e.port + 1}:{e.port}[/code].[/]"
)
exit(1)
if attached:
for entry in run.logs():
sys.stdout.buffer.write(entry)
sys.stdout.buffer.flush()
Expand Down
8 changes: 5 additions & 3 deletions src/dstack/_internal/core/services/ssh/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@


class PortUsedError(DstackError):
pass
def __init__(self, port: int):
self.port = port
super().__init__(f"Port {port} is already in use")


class PortsLock:
Expand All @@ -28,10 +30,10 @@ def acquire(self) -> "PortsLock":
if not local_port: # None or 0
continue
if local_port in assigned_ports:
raise PortUsedError(f"Port {local_port} is already in use")
raise PortUsedError(local_port)
sock = self._listen(local_port)
if sock is None:
raise PortUsedError(f"Port {local_port} is already in use")
raise PortUsedError(local_port)
self.sockets[remote_port] = sock
assigned_ports.add(local_port)

Expand Down