From 8164b74d187704e18d1c1e814fadb997ff95366b Mon Sep 17 00:00:00 2001 From: Amy Krause Date: Tue, 4 Mar 2025 15:45:27 +0000 Subject: [PATCH] new copy functionality --- src/jupyter_remote/k8s_copy.py | 294 ++++++++++++++++++++++++++++++++ src/jupyter_remote/kube_exec.py | 42 +++-- 2 files changed, 323 insertions(+), 13 deletions(-) create mode 100644 src/jupyter_remote/k8s_copy.py diff --git a/src/jupyter_remote/k8s_copy.py b/src/jupyter_remote/k8s_copy.py new file mode 100644 index 0000000..2db93de --- /dev/null +++ b/src/jupyter_remote/k8s_copy.py @@ -0,0 +1,294 @@ +import os +import tarfile +from abc import ABCMeta, abstractmethod +from select import select +from tempfile import NamedTemporaryFile, TemporaryFile +import yaml + +from kubernetes.stream import stream +from kubernetes.stream.ws_client import ( + ABNF, + ERROR_CHANNEL, + STDERR_CHANNEL, + STDOUT_CHANNEL, +) + + +class K8SCopy(): + def __init__(self, client, name, namespace, local_path, remote_path): + self.client = client + self.name = name + self.namespace = namespace + self.remote_path = remote_path + self.local_path = local_path + + def _run_from_pod(self, cmd): + resp = stream( + self.client.connect_get_namespaced_pod_exec, + self.name, + self.namespace, + command=cmd, + async_req=False, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _preload_content=False, + ) + + stderr, stdout = [], [] + while resp.is_open(): + resp.update(timeout=1) + if resp.peek_stdout(): + stdout.extend(resp.read_stdout().rstrip("\n").split("\n")) + if resp.peek_stderr(): + stderr.extend(resp.read_stderr().rstrip("\n").split("\n")) + error = resp.read_channel(ERROR_CHANNEL) + resp.close() + error = yaml.safe_load(error) + return error, stdout, stderr + + def is_directory_path_from_pod(self, file_path, failed_if_not_exists=True): + # check if file exists + error, out, err = self._run_from_pod(cmd=["test", "-e", file_path]) + if error.get("status") != "Success": + if failed_if_not_exists: + return None, "%s does not exist in remote pod filesystem" % file_path + return False, None + error, out, err = self._run_from_pod(cmd=["test", "-d", file_path]) + return error.get("status") == "Success", None + + +class K8SCopyFromPod(K8SCopy): + """ + Copy files/directory from Pod into local filesystem + """ + + def __init__(self, client, name, namespace, local_path, remote_path): + super().__init__(client, name, namespace, local_path, remote_path) + self.is_remote_path_dir = None + self.files_to_copy = [] + self._shellname = None + + @property + def pod_shell(self): + if self._shellname is None: + for s in ("/bin/sh", "/bin/bash"): + error, out, err = self._run_from_pod(s) + if error.get("status") == "Success": + self._shellname = s + break + return self._shellname + + def listfiles_with_find(self, path): + find_cmd = ["find", path, "-type", "f"] + error, files, err = self._run_from_pod(cmd=find_cmd) + if error.get("status") != "Success": + raise Exception(error.get("message")) + return files + + def listfile_with_echo(self, path): + echo_cmd = [ + self.pod_shell, + "-c", + "echo {path}/* {path}/.*".format( + path=path.translate(str.maketrans({" ": r"\ "})) + ), + ] + error, out, err = self._run_from_pod(cmd=echo_cmd) + if error.get("status") != "Success": + raise Exception(error.get("message")) + + files = [] + if out: + output = out[0] + " " + files = [ + os.path.join(path, p[:-1]) + for p in output.split(f"{path}/") + if p and p[:-1] not in (".", "..") + ] + + result = [] + for f in files: + is_dir, err = self.is_directory_path_from_pod(f) + if err: + continue + if not is_dir: + result.append(f) + continue + result += self.listfile_with_echo(f) + return result + + def list_remote_files(self): + """ + This method will check if the remote path is a dir or file + if it is a directory the file list will be updated accordingly + """ + # check is remote path exists and is a file or directory + is_dir, error = self.is_directory_path_from_pod(self.remote_path) + if error: + raise Exception(error) + + if not is_dir: + return [self.remote_path] + else: + # find executable to list dir with + executables = dict( + find=self.listfiles_with_find, + echo=self.listfile_with_echo, + ) + for item in executables: + error, out, err = self._run_from_pod(item) + if error.get("status") == "Success": + return executables.get(item)(self.remote_path) + + def read(self): + self.stdout = None + self.stderr = None + + if self.response.is_open(): + if not self.response.sock.connected: + self.response._connected = False + else: + ret, out, err = select((self.response.sock.sock,), (), (), 0) + if ret: + code, frame = self.response.sock.recv_data_frame(True) + if code == ABNF.OPCODE_CLOSE: + self.response._connected = False + elif ( + code in (ABNF.OPCODE_BINARY, ABNF.OPCODE_TEXT) + and len(frame.data) > 1 + ): + channel = frame.data[0] + content = frame.data[1:] + if content: + if channel == STDOUT_CHANNEL: + self.stdout = content + elif channel == STDERR_CHANNEL: + self.stderr = content.decode("utf-8", "replace") + + def copy(self): + is_remote_path_dir = ( + len(self.files_to_copy) > 1 or self.files_to_copy[0] != self.remote_path + ) + relpath_start = self.remote_path + if is_remote_path_dir and os.path.isdir(self.local_path): + relpath_start = os.path.dirname(self.remote_path) + + for remote_file in self.files_to_copy: + dest_file = self.local_path + if is_remote_path_dir: + dest_file = os.path.join( + self.local_path, + os.path.relpath(remote_file, start=relpath_start), + ) + # create directory to copy file in + os.makedirs(os.path.dirname(dest_file), exist_ok=True) + + pod_command = ["cat", remote_file] + self.response = stream( + self.client.connect_get_namespaced_pod_exec, + self.name, + self.namespace, + command=pod_command, + stderr=True, + stdin=True, + stdout=True, + tty=False, + _preload_content=False, + ) + errors = [] + with open(dest_file, "wb") as fh: + while self.response._connected: + self.read() + if self.stdout: + fh.write(self.stdout) + if self.stderr: + errors.append(self.stderr) + if errors: + raise Exception("Failed to copy file from Pod: {0}".format("".join(errors))) + + def run(self): + self.files_to_copy = self.list_remote_files() + if self.files_to_copy == []: + return + self.copy() + print(f"{self.remote_path} in remote Pod successfully copied into {self.local_path}") + +class K8SCopyToPod(K8SCopy): + """ + Copy files/directory from local filesystem into remote Pod + """ + + def __init__(self, client, name, namespace, local_path, remote_path): + super().__init__(client, name, namespace, local_path, remote_path) + self.files_to_copy = list() + + def run(self): + # remove trailing slash from destination path + dest_file = self.remote_path.rstrip("/") + src_file = self.local_path + if not os.path.exists(self.local_path): + raise Exception("{0} does not exist in local filesystem".format(self.local_path)) + if not os.access(self.local_path, os.R_OK): + raise Exception("{0} not readable".format(self.local_path)) + + is_dir, err = self.is_directory_path_from_pod( + self.remote_path, failed_if_not_exists=False + ) + if err: + raise Exception(err) + if is_dir: + dest_file = os.path.join(dest_file, os.path.basename(src_file)) + + tar_command = [ + "tar", + "--no-same-permissions", + "--no-same-owner", + "-xmf", + "-", + ] + + if dest_file.startswith("/"): + tar_command.extend(["-C", "/"]) + + response = stream( + self.client.connect_get_namespaced_pod_exec, + self.name, + self.namespace, + command=tar_command, + stderr=True, + stdin=True, + stdout=True, + tty=False, + _preload_content=False, + ) + with TemporaryFile() as tar_buffer: + with tarfile.open(fileobj=tar_buffer, mode="w") as tar: + tar.add(src_file, dest_file) + tar_buffer.seek(0) + commands = [] + # push command in chunk mode + size = 1024 * 1024 + while True: + data = tar_buffer.read(size) + if not data: + break + commands.append(data) + + stderr, stdout = [], [] + while response.is_open(): + if response.peek_stdout(): + stdout.append(response.read_stdout().rstrip("\n")) + if response.peek_stderr(): + stderr.append(response.read_stderr().rstrip("\n")) + if commands: + cmd = commands.pop(0) + response.write_stdin(cmd) + else: + break + response.close() + if stderr: + raise Exception("Failed to copy local file/directory into Pod due to: {0}".format("".join(stderr))) + + print(f"{self.local_path} successfully copied into remote Pod into {self.remote_path}") diff --git a/src/jupyter_remote/kube_exec.py b/src/jupyter_remote/kube_exec.py index 1089dad..012bd91 100644 --- a/src/jupyter_remote/kube_exec.py +++ b/src/jupyter_remote/kube_exec.py @@ -16,6 +16,8 @@ from .remote import ( list_jobs, ) +from .k8s_copy import K8SCopyFromPod, K8SCopyToPod + from IPython.core.magic import ( Magics, cell_magic, @@ -66,6 +68,7 @@ class ExternalKubernetesMagic(Magics): # delete container? print(f'Failed to connect: {result["message"]}', file=sys.stderr) + @needs_local_scope @line_magic("kube_exec") @cell_magic("kube_exec") @@ -74,14 +77,7 @@ class ExternalKubernetesMagic(Magics): "line", default="", type=str, - nargs='?', - # choices=[ - # '', 'execute', - # 'launch', - # 'connect', - # 'list', - # 'destroy', - # ], + nargs='*', help="Command") @argument( "-k", @@ -159,7 +155,11 @@ class ExternalKubernetesMagic(Magics): def execute(self, line="", cell="", local_ns=None): args = parse_argstring(self.execute, line) # print(args) - if args.line == 'launch': + try: + cmd = args.line[0] + except: + cmd = args.line + if cmd == 'launch': if not Path(args.kubeconfig).expanduser().is_file(): print(f'Error: Invalid kube-config file. No configuration found at {args.kubeconfig}', file=sys.stderr) return @@ -174,7 +174,7 @@ class ExternalKubernetesMagic(Magics): self.name = result['pod_name'] self.job_name = result['job_name'] self.prepare_pod(result) - elif args.line == 'connect': + elif cmd == 'connect': # connect to running container if not Path(args.kubeconfig).expanduser().is_file(): print(f'Error: Invalid kube-config file. No configuration found at {args.kubeconfig}', file=sys.stderr) @@ -185,7 +185,7 @@ class ExternalKubernetesMagic(Magics): result = connect_pod(self.name, args, launch=False) self.client = result['client'] self.prepare_pod(result) - elif args.line == 'destroy': + elif cmd == 'destroy': # delete the container if not Path(args.kubeconfig).expanduser().is_file(): print(f'Error: Invalid kube-config file. No configuration found at {args.kubeconfig}', file=sys.stderr) @@ -207,7 +207,7 @@ class ExternalKubernetesMagic(Magics): self.job_name = None # create a new session id self.session = uuid.uuid1().hex - elif args.line == 'pods': + elif cmd == 'pods': if not Path(args.kubeconfig).expanduser().is_file(): print(f'Error: Invalid kube-config file. No configuration found at {args.kubeconfig}', file=sys.stderr) return @@ -216,7 +216,7 @@ class ExternalKubernetesMagic(Magics): return self.client = get_client_v1(args) return list_pods(self.client, args.namespace) - elif args.line == 'jobs': + elif cmd == 'jobs': if not Path(args.kubeconfig).expanduser().is_file(): print(f'Error: Invalid kube-config file. No configuration found at {args.kubeconfig}', file=sys.stderr) return @@ -225,6 +225,22 @@ class ExternalKubernetesMagic(Magics): return self.batch_client = get_batch_client_v1(args) return list_jobs(self.batch_client, args.namespace) + elif cmd == 'cp': + if self.client is None or self.name is None or self.namespace is None: + print('No remote execution container. Please launch a container first.', file=sys.stderr) + if len(args.line) < 3: + print('Error: Source and destination are required for copy', file=sys.stderr) + return + source = args.line[1] + destination = args.line[2] + if source.startswith('pod:') and not destination.startswith('pod:'): + cp = K8SCopyFromPod(self.client, self.name, self.namespace, remote_path=source[4:], local_path=destination) + cp.run() + elif destination.startswith('pod:') and not source.startswith('pod:'): + cp = K8SCopyToPod(self.client, self.name, self.namespace, local_path=source, remote_path=destination[4:]) + cp.run() + else: + print('Not implemented: Cannot copy local to local or remote to remote.', file=sys.stderr) else: code = cell or line if self.client is None or self.name is None or self.namespace is None: -- GitLab