From 8db8cb3c749e74548119591b56807cd62a04d386 Mon Sep 17 00:00:00 2001 From: Hiroshi Suzuki <20717881+herosi@users.noreply.github.com> Date: Sun, 19 Jan 2025 21:58:06 +0900 Subject: [PATCH 1/4] Added next_id support --- ctfcli/core/challenge.py | 76 ++++++++++++++++++++++++++++++- ctfcli/spec/challenge-example.yml | 7 +++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/ctfcli/core/challenge.py b/ctfcli/core/challenge.py index 553cc91..6ec809e 100644 --- a/ctfcli/core/challenge.py +++ b/ctfcli/core/challenge.py @@ -47,7 +47,7 @@ class Challenge(dict): "type", "extra", "image", "protocol", "host", "connection_info", "healthcheck", "attempts", "flags", "files", "topics", "tags", "files", "hints", - "requirements", "state", "version", + "requirements", "next_id", "state", "version", # fmt: on ] @@ -103,6 +103,8 @@ def is_default_challenge_property(key: str, value: Any) -> bool: if key in ["tags", "hints", "topics", "requirements", "files"] and value == []: return True + if key == "next_id" and value is None: + return True return False @staticmethod @@ -434,6 +436,40 @@ def _set_required_challenges(self): r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=requirements_payload) r.raise_for_status() + def _set_next_id(self, nid): + if type(nid) == str: + # nid by name + # find the challenge id from installed challenges + remote_challenges = self.load_installed_challenges() + for remote_challenge in remote_challenges: + if remote_challenge["name"] == nid: + nid = remote_challenge["id"] + break + if type(nid) == str: + click.secho( + "Challenge cannot find next_id. Maybe it is invalid name or id. It will be cleared.", + fg="yellow", + ) + nid = None + elif type(nid) == int and nid > 0: + # nid by challenge id + # trust it and use it directly + nid = remote_challenge["id"] + else: + nid = None + + if self.challenge_id == nid: + click.secho( + "Challenge cannot set next_id itself. Skipping invalid next_id.", + fg="yellow", + ) + nid = None + + #return nid + next_id_payload = {"next_id": nid} + r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=next_id_payload) + r.raise_for_status() + # Compare challenge requirements, will resolve all IDs to names def _compare_challenge_requirements(self, r1: List[Union[str, int]], r2: List[Union[str, int]]) -> bool: remote_challenges = self.load_installed_challenges() @@ -453,6 +489,21 @@ def normalize_requirements(requirements): return normalize_requirements(r1) == normalize_requirements(r2) + # Compare challenge next_id, will resolve all IDs to names + def _compare_challenge_next_id(self, r1: Union[str, int, None], r2: Union[str, int, None]) -> bool: + def normalize_next_id(r): + normalized = None + if type(r) == int: + remote_challenge = self.load_installed_challenge(r) + if remote_challenge["id"] == r: + normalized = remote_challenge["name"] + else: + normalized = r + + return normalized + + return normalize_next_id(r1) == normalize_next_id(r2) + # Normalize challenge data from the API response to match challenge.yml # It will remove any extra fields from the remote, as well as expand external references # that have to be fetched separately (e.g., files, flags, hints, etc.) @@ -521,6 +572,16 @@ def _normalize_challenge(self, challenge_data: Dict[str, Any]): challenges = r.json()["data"] challenge["requirements"] = [c["name"] for c in challenges if c["id"] in requirements] + # Add next_id + nid = challenge_data.get("next_id", None) + if nid: + # Prefer challenge names over IDs + r = self.api.get(f"/api/v1/challenges/{nid}") + r.raise_for_status() + challenge["next_id"] = r.json()["data"]["name"] + else: + challenge["next_id"] = None + return challenge # Create a dictionary of remote files in { basename: {"url": "", "location": ""} } format @@ -634,6 +695,11 @@ def sync(self, ignore: Tuple[str] = ()) -> None: if challenge.get("requirements") and "requirements" not in ignore: self._set_required_challenges() + # Set next_id + nid = challenge.get("next_id", None) + if "next_id" not in ignore: + self._set_next_id(nid) + make_challenge_visible = False # Bring back the challenge to be visible if: @@ -711,6 +777,11 @@ def create(self, ignore: Tuple[str] = ()) -> None: if challenge.get("requirements") and "requirements" not in ignore: self._set_required_challenges() + # Add next_id + nid = challenge.get("next_id", None) + if "next_id" not in ignore: + self._set_next_id(nid) + # Bring back the challenge if it's supposed to be visible # Either explicitly, or by assuming the default value (possibly because the state is ignored) if challenge.get("state", "visible") == "visible" or "state" in ignore: @@ -864,6 +935,9 @@ def verify(self, ignore: Tuple[str] = ()) -> bool: if key == "requirements": if self._compare_challenge_requirements(challenge[key], normalized_challenge[key]): continue + if key == "next_id": + if self._compare_challenge_next_id(challenge[key], normalized_challenge[key]): + continue return False diff --git a/ctfcli/spec/challenge-example.yml b/ctfcli/spec/challenge-example.yml index 903b4c3..a303265 100644 --- a/ctfcli/spec/challenge-example.yml +++ b/ctfcli/spec/challenge-example.yml @@ -115,6 +115,13 @@ requirements: - "Warmup" - "Are you alive" +# The next_id is used to display a next recommended challenge to a user when +# the user correctly answers the current challenge. +# Can be removed if unused +# Accepts a challenge name as a string, a challenge ID as an integer, or null +# if you want to remove or disable it. +next_id: null + # The state of the challenge. # If the field is omitted, the challenge is visible by default. # If provided, the field can take one of two values: hidden, visible. From 32c55910795e650d11f71e375e34ee8f7509e167 Mon Sep 17 00:00:00 2001 From: Hiroshi Suzuki <20717881+herosi@users.noreply.github.com> Date: Thu, 23 Jan 2025 21:59:52 +0900 Subject: [PATCH 2/4] Some improvements - Added support for anonymize flag in requirements - Added data in flags with static type in challenge-example.yml to avoid verify command fails - Added sort for requirements because it seems CTFd does not accept prerequisites if they are not sorted. - Applied black to challenge.py to avoid Linting error. --- ctfcli/core/challenge.py | 336 ++++++++++++++++++++++++------ ctfcli/spec/challenge-example.yml | 8 + 2 files changed, 275 insertions(+), 69 deletions(-) diff --git a/ctfcli/core/challenge.py b/ctfcli/core/challenge.py index 6ec809e..ad474b1 100644 --- a/ctfcli/core/challenge.py +++ b/ctfcli/core/challenge.py @@ -31,7 +31,9 @@ def str_presenter(dumper, data): fixed_data = "\n".join(text_list) return dumper.represent_scalar("tag:yaml.org,2002:str", fixed_data, style="|") elif len(data) > 80: - return dumper.represent_scalar("tag:yaml.org,2002:str", data.rstrip(), style=">") + return dumper.represent_scalar( + "tag:yaml.org,2002:str", data.rstrip(), style=">" + ) return dumper.represent_scalar("tag:yaml.org,2002:str", data) @@ -47,7 +49,8 @@ class Challenge(dict): "type", "extra", "image", "protocol", "host", "connection_info", "healthcheck", "attempts", "flags", "files", "topics", "tags", "files", "hints", - "requirements", "next_id", "state", "version", + "requirements", "require_anonymize", "next_id", "state", + "version", # fmt: on ] @@ -61,14 +64,18 @@ class Challenge(dict): @staticmethod def load_installed_challenge(challenge_id) -> Dict: api = API() - r = api.get(f"/api/v1/challenges/{challenge_id}") + r = api.get(f"/api/v1/challenges/{challenge_id}?view=admin") if not r.ok: - raise RemoteChallengeNotFound(f"Could not load challenge with id={challenge_id}") + raise RemoteChallengeNotFound( + f"Could not load challenge with id={challenge_id}" + ) installed_challenge = r.json().get("data", None) if not installed_challenge: - raise RemoteChallengeNotFound(f"Could not load challenge with id={challenge_id}") + raise RemoteChallengeNotFound( + f"Could not load challenge with id={challenge_id}" + ) return installed_challenge @@ -94,6 +101,9 @@ def is_default_challenge_property(key: str, value: Any) -> bool: if key == "attempts" and value == 0: return True + if key == "require_anonymize" and value == False: + return True + if key == "state" and value == "visible": return True @@ -112,7 +122,9 @@ def clone(config, remote_challenge): name = remote_challenge["name"] if name is None: - raise ChallengeException(f'Could not get name of remote challenge with id {remote_challenge["id"]}') + raise ChallengeException( + f'Could not get name of remote challenge with id {remote_challenge["id"]}' + ) # First, generate a name for the challenge directory category = remote_challenge.get("category", None) @@ -127,7 +139,9 @@ def clone(config, remote_challenge): # Create an blank/empty challenge, with only the challenge.yml containing the challenge name template_path = config.get_base_path() / "templates" / "blank" / "empty" - log.debug(f"Challenge.clone: cookiecutter({str(template_path)}, {name=}, {challenge_dir_name=}") + log.debug( + f"Challenge.clone: cookiecutter({str(template_path)}, {name=}, {challenge_dir_name=}" + ) cookiecutter( str(template_path), no_input=True, @@ -135,7 +149,9 @@ def clone(config, remote_challenge): ) if not Path(challenge_dir_name).exists(): - raise ChallengeException(f"Could not create challenge directory '{challenge_dir_name}' for '{name}'") + raise ChallengeException( + f"Could not create challenge directory '{challenge_dir_name}' for '{name}'" + ) # Add the newly created local challenge to the config file config["challenges"][challenge_dir_name] = challenge_dir_name @@ -161,7 +177,9 @@ def __init__(self, challenge_yml: Union[str, PathLike], overrides=None): self.challenge_file_path = Path(challenge_yml) if not self.challenge_file_path.is_file(): - raise InvalidChallengeFile(f"Challenge file at {self.challenge_file_path} could not be found") + raise InvalidChallengeFile( + f"Challenge file at {self.challenge_file_path} could not be found" + ) self.challenge_directory = self.challenge_file_path.parent @@ -169,7 +187,9 @@ def __init__(self, challenge_yml: Union[str, PathLike], overrides=None): try: challenge_definition = yaml.safe_load(challenge_file.read()) except yaml.YAMLError as e: - raise InvalidChallengeFile(f"Challenge file at {self.challenge_file_path} could not be loaded:\n{e}") + raise InvalidChallengeFile( + f"Challenge file at {self.challenge_file_path} could not be loaded:\n{e}" + ) if type(challenge_definition) != dict: raise InvalidChallengeFile( @@ -191,7 +211,9 @@ def __init__(self, challenge_yml: Union[str, PathLike], overrides=None): def __str__(self): return self["name"] - def _process_challenge_image(self, challenge_image: Optional[str]) -> Optional[Image]: + def _process_challenge_image( + self, challenge_image: Optional[str] + ) -> Optional[Image]: if not challenge_image: return None @@ -221,12 +243,16 @@ def _process_challenge_image(self, challenge_image: Optional[str]) -> Optional[I # Check if it's a path to dockerfile to be built if (self.challenge_directory / challenge_image / "Dockerfile").exists(): - return Image(slugify(self["name"]), self.challenge_directory / self["image"]) + return Image( + slugify(self["name"]), self.challenge_directory / self["image"] + ) # Check if it's a local pre-built image if ( subprocess.call( - ["docker", "inspect", challenge_image], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ["docker", "inspect", challenge_image], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, ) == 0 ): @@ -250,7 +276,9 @@ def _load_challenge_id(self): # return if we failed to determine the challenge id (failed to find the challenge) if self.challenge_id is None: - raise RemoteChallengeNotFound(f"Could not load remote challenge with name '{self['name']}'") + raise RemoteChallengeNotFound( + f"Could not load remote challenge with name '{self['name']}'" + ) def _validate_files(self): files = self.get("files") or [] @@ -284,7 +312,9 @@ def _get_initial_challenge_payload(self, ignore: Tuple[str] = ()) -> Dict: challenge_payload["max_attempts"] = challenge.get("attempts", 0) if "connection_info" not in ignore: - challenge_payload["connection_info"] = challenge.get("connection_info", None) + challenge_payload["connection_info"] = challenge.get( + "connection_info", None + ) if "extra" not in ignore: challenge_payload = {**challenge_payload, **challenge.get("extra", {})} @@ -313,9 +343,13 @@ def _create_flags(self): r.raise_for_status() def _delete_existing_topics(self): - remote_topics = self.api.get(f"/api/v1/challenges/{self.challenge_id}/topics").json()["data"] + remote_topics = self.api.get( + f"/api/v1/challenges/{self.challenge_id}/topics" + ).json()["data"] for topic in remote_topics: - r = self.api.delete(f"/api/v1/topics?type=challenge&target_id={topic['id']}") + r = self.api.delete( + f"/api/v1/topics?type=challenge&target_id={topic['id']}" + ) r.raise_for_status() def _create_topics(self): @@ -369,7 +403,9 @@ def _create_all_files(self): files = self.get("files") or [] for challenge_file in files: - new_files.append(("file", open(self.challenge_directory / challenge_file, mode="rb"))) + new_files.append( + ("file", open(self.challenge_directory / challenge_file, mode="rb")) + ) files_payload = {"challenge_id": self.challenge_id, "type": "challenge"} @@ -431,9 +467,20 @@ def _set_required_challenges(self): fg="yellow", ) required_challenges.remove(self.challenge_id) + required_challenges.sort() - requirements_payload = {"requirements": {"prerequisites": required_challenges}} - r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=requirements_payload) + if self.get("require_anonymize") == None: + self["require_anonymize"] = False + + requirements_payload = { + "requirements": { + "prerequisites": required_challenges, + "anonymize": self["require_anonymize"], + } + } + r = self.api.patch( + f"/api/v1/challenges/{self.challenge_id}", json=requirements_payload + ) r.raise_for_status() def _set_next_id(self, nid): @@ -465,13 +512,16 @@ def _set_next_id(self, nid): ) nid = None - #return nid next_id_payload = {"next_id": nid} - r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=next_id_payload) + r = self.api.patch( + f"/api/v1/challenges/{self.challenge_id}", json=next_id_payload + ) r.raise_for_status() # Compare challenge requirements, will resolve all IDs to names - def _compare_challenge_requirements(self, r1: List[Union[str, int]], r2: List[Union[str, int]]) -> bool: + def _compare_challenge_requirements( + self, r1: List[Union[str, int]], r2: List[Union[str, int]] + ) -> bool: remote_challenges = self.load_installed_challenges() def normalize_requirements(requirements): @@ -487,10 +537,16 @@ def normalize_requirements(requirements): return normalized - return normalize_requirements(r1) == normalize_requirements(r2) + nr1 = normalize_requirements(r1) + nr1.sort() + nr2 = normalize_requirements(r2) + nr2.sort() + return nr1 == nr2 # Compare challenge next_id, will resolve all IDs to names - def _compare_challenge_next_id(self, r1: Union[str, int, None], r2: Union[str, int, None]) -> bool: + def _compare_challenge_next_id( + self, r1: Union[str, int, None], r2: Union[str, int, None] + ) -> bool: def normalize_next_id(r): normalized = None if type(r) == int: @@ -513,13 +569,31 @@ def normalize_next_id(r): def _normalize_challenge(self, challenge_data: Dict[str, Any]): challenge = {} - copy_keys = ["name", "category", "attribution", "value", "type", "state", "connection_info"] + copy_keys = [ + "name", + "category", + "attribution", + "value", + "type", + "state", + "connection_info", + ] for key in copy_keys: if key in challenge_data: challenge[key] = challenge_data[key] - challenge["description"] = challenge_data["description"].strip().replace("\r\n", "\n").replace("\t", "") - challenge["attribution"] = challenge_data.get("attribution", "").strip().replace("\r\n", "\n").replace("\t", "") + challenge["description"] = ( + challenge_data["description"] + .strip() + .replace("\r\n", "\n") + .replace("\t", "") + ) + challenge["attribution"] = ( + challenge_data.get("attribution", "") + .strip() + .replace("\r\n", "\n") + .replace("\t", "") + ) challenge["attempts"] = challenge_data["max_attempts"] for key in ["initial", "decay", "minimum"]: @@ -534,9 +608,15 @@ def _normalize_challenge(self, challenge_data: Dict[str, Any]): r.raise_for_status() flags = r.json()["data"] challenge["flags"] = [ - f["content"] - if f["type"] == "static" and (f["data"] is None or f["data"] == "") - else {"content": f["content"].strip().replace("\r\n", "\n"), "type": f["type"], "data": f["data"]} + ( + f["content"] + if f["type"] == "static" and (f["data"] is None or f["data"] == "") + else { + "content": f["content"].strip().replace("\r\n", "\n"), + "type": f["type"], + "data": f["data"], + } + ) for f in flags ] @@ -552,7 +632,12 @@ def _normalize_challenge(self, challenge_data: Dict[str, Any]): hints = r.json()["data"] # skipping pre-requisites for hints because they are not supported in ctfcli challenge["hints"] = [ - {"content": h["content"], "cost": h["cost"]} if h["cost"] > 0 else h["content"] for h in hints + ( + {"content": h["content"], "cost": h["cost"]} + if h["cost"] > 0 + else h["content"] + ) + for h in hints ] # Add topics @@ -567,10 +652,16 @@ def _normalize_challenge(self, challenge_data: Dict[str, Any]): requirements = (r.json().get("data") or {}).get("prerequisites", []) if len(requirements) > 0: # Prefer challenge names over IDs - r = self.api.get("/api/v1/challenges") - r.raise_for_status() - challenges = r.json()["data"] - challenge["requirements"] = [c["name"] for c in challenges if c["id"] in requirements] + r2 = self.api.get("/api/v1/challenges?view=admin") + r2.raise_for_status() + challenges = r2.json()["data"] + challenge["requirements"] = [ + c["name"] for c in challenges if c["id"] in requirements + ] + # Add anonymize flag + challenge["require_anonymize"] = (r.json().get("data") or {}).get( + "anonymize", False + ) # Add next_id nid = challenge_data.get("next_id", None) @@ -585,11 +676,16 @@ def _normalize_challenge(self, challenge_data: Dict[str, Any]): return challenge # Create a dictionary of remote files in { basename: {"url": "", "location": ""} } format - def _normalize_remote_files(self, remote_files: List[str]) -> Dict[str, Dict[str, str]]: + def _normalize_remote_files( + self, remote_files: List[str] + ) -> Dict[str, Dict[str, str]]: normalized = {} for f in remote_files: file_parts = f.split("?token=")[0].split("/") - normalized[file_parts[-1]] = {"url": f, "location": f"{file_parts[-2]}/{file_parts[-1]}"} + normalized[file_parts[-1]] = { + "url": f, + "location": f"{file_parts[-2]}/{file_parts[-1]}", + } return normalized @@ -621,13 +717,21 @@ def sync(self, ignore: Tuple[str] = ()) -> None: remote_challenge = self.load_installed_challenge(self.challenge_id) # if value, category, type or description are ignored, revert them to the remote state in the initial payload - reset_properties_if_ignored = ["value", "category", "type", "description", "attribution"] + reset_properties_if_ignored = [ + "value", + "category", + "type", + "description", + "attribution", + ] for p in reset_properties_if_ignored: if p in ignore: challenge_payload[p] = remote_challenge[p] # Update simple properties - r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json=challenge_payload) + r = self.api.patch( + f"/api/v1/challenges/{self.challenge_id}", json=challenge_payload + ) r.raise_for_status() # Update flags @@ -668,14 +772,21 @@ def sync(self, ignore: Tuple[str] = ()) -> None: for local_file_name in local_files: # Creating a new file if local_file_name not in remote_files: - self._create_file(self.challenge_directory / local_files[local_file_name]) + self._create_file( + self.challenge_directory / local_files[local_file_name] + ) continue # Updating an existing file # sha1sum is present in CTFd 3.7+, use it instead of always re-uploading the file if possible - remote_file_sha1sum = sha1sums[remote_files[local_file_name]["location"]] + remote_file_sha1sum = sha1sums[ + remote_files[local_file_name]["location"] + ] if remote_file_sha1sum is not None: - with open(self.challenge_directory / local_files[local_file_name], "rb") as lf: + with open( + self.challenge_directory / local_files[local_file_name], + "rb", + ) as lf: local_file_sha1sum = hash_file(lf) if local_file_sha1sum == remote_file_sha1sum: @@ -683,7 +794,9 @@ def sync(self, ignore: Tuple[str] = ()) -> None: # if sha1sums are not present, or the hashes are different, re-upload the file self._delete_file(remote_files[local_file_name]["location"]) - self._create_file(self.challenge_directory / local_files[local_file_name]) + self._create_file( + self.challenge_directory / local_files[local_file_name] + ) # Update hints if "hints" not in ignore: @@ -714,7 +827,9 @@ def sync(self, ignore: Tuple[str] = ()) -> None: make_challenge_visible = True if make_challenge_visible: - r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json={"state": "visible"}) + r = self.api.patch( + f"/api/v1/challenges/{self.challenge_id}", json={"state": "visible"} + ) r.raise_for_status() def create(self, ignore: Tuple[str] = ()) -> None: @@ -730,7 +845,10 @@ def create(self, ignore: Tuple[str] = ()) -> None: if not challenge.get("name", False): raise InvalidChallengeDefinition("Challenge does not provide a name") - if not challenge.get("value", False) and challenge.get("type", "standard") != "dynamic": + if ( + not challenge.get("value", False) + and challenge.get("type", "standard") != "dynamic" + ): raise InvalidChallengeDefinition("Challenge does not provide a value") if challenge.get("files", False) and "files" not in ignore: @@ -785,7 +903,9 @@ def create(self, ignore: Tuple[str] = ()) -> None: # Bring back the challenge if it's supposed to be visible # Either explicitly, or by assuming the default value (possibly because the state is ignored) if challenge.get("state", "visible") == "visible" or "state" in ignore: - r = self.api.patch(f"/api/v1/challenges/{self.challenge_id}", json={"state": "visible"}) + r = self.api.patch( + f"/api/v1/challenges/{self.challenge_id}", json={"state": "visible"} + ) r.raise_for_status() def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: @@ -794,17 +914,30 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: issues = {"fields": [], "dockerfile": [], "hadolint": [], "files": []} # Check if required fields are present - for field in ["name", "author", "category", "description", "attribution", "value"]: + for field in [ + "name", + "author", + "category", + "description", + "attribution", + "value", + ]: # value is allowed to be none if the challenge type is dynamic if field == "value" and challenge.get("type") == "dynamic": continue if challenge.get(field) is None: - issues["fields"].append(f"challenge.yml is missing required field: {field}") + issues["fields"].append( + f"challenge.yml is missing required field: {field}" + ) # Check that the image field and Dockerfile match - if (self.challenge_directory / "Dockerfile").is_file() and challenge.get("image", "") != ".": - issues["dockerfile"].append("Dockerfile exists but image field does not point to it") + if (self.challenge_directory / "Dockerfile").is_file() and challenge.get( + "image", "" + ) != ".": + issues["dockerfile"].append( + "Dockerfile exists but image field does not point to it" + ) # Check that Dockerfile exists and is EXPOSE'ing a port if challenge.get("image") == ".": @@ -812,7 +945,9 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: has_dockerfile = dockerfile_path.is_file() if not has_dockerfile: - issues["dockerfile"].append("Dockerfile specified in 'image' field but no Dockerfile found") + issues["dockerfile"].append( + "Dockerfile specified in 'image' field but no Dockerfile found" + ) if has_dockerfile: with open(dockerfile_path, "r") as dockerfile: @@ -857,14 +992,18 @@ def lint(self, skip_hadolint=False, flag_format="flag{") -> bool: for s in strings(challenge_file_path): if flag_format in s: s = s.strip() - issues["files"].append(f"Potential flag found in distributed file '{challenge_file}':\n {s}") + issues["files"].append( + f"Potential flag found in distributed file '{challenge_file}':\n {s}" + ) if any(messages for messages in issues.values() if len(messages) > 0): raise LintException(issues=issues) return True - def mirror(self, files_directory_name: str = "dist", ignore: Tuple[str] = ()) -> None: + def mirror( + self, files_directory_name: str = "dist", ignore: Tuple[str] = () + ) -> None: self._load_challenge_id() remote_challenge = self.load_installed_challenge(self.challenge_id) challenge = self._normalize_challenge(remote_challenge) @@ -887,22 +1026,34 @@ def mirror(self, files_directory_name: str = "dist", ignore: Tuple[str] = ()) -> r.raise_for_status() # Ensure the directory for the challenge files exists - challenge_files_directory = self.challenge_directory / files_directory_name + challenge_files_directory = ( + self.challenge_directory / files_directory_name + ) challenge_files_directory.mkdir(parents=True, exist_ok=True) - (challenge_files_directory / remote_file_name).write_bytes(r.content) - challenge["files"].append(f"{files_directory_name}/{remote_file_name}") + (challenge_files_directory / remote_file_name).write_bytes( + r.content + ) + challenge["files"].append( + f"{files_directory_name}/{remote_file_name}" + ) # The file is already present in the challenge.yml - we know the desired path else: r = self.api.get(remote_file) r.raise_for_status() - (self.challenge_directory / local_files[remote_file_name]).write_bytes(r.content) + ( + self.challenge_directory / local_files[remote_file_name] + ).write_bytes(r.content) # Soft-Delete files that are not present on the remote # Remove them from challenge.yml but do not delete them from disk - remote_file_names = [f.split("/")[-1].split("?token=")[0] for f in remote_challenge["files"]] - challenge["files"] = [f for f in challenge["files"] if Path(f).name in remote_file_names] + remote_file_names = [ + f.split("/")[-1].split("?token=")[0] for f in remote_challenge["files"] + ] + challenge["files"] = [ + f for f in challenge["files"] if Path(f).name in remote_file_names + ] for key in challenge.keys(): if key not in ignore: @@ -929,16 +1080,30 @@ def verify(self, ignore: Tuple[str] = ()) -> bool: if self.is_default_challenge_property(key, normalized_challenge[key]): continue + click.secho( + f"{key} is not in challenge.", + fg="yellow", + ) + return False if challenge[key] != normalized_challenge[key]: if key == "requirements": - if self._compare_challenge_requirements(challenge[key], normalized_challenge[key]): + if self._compare_challenge_requirements( + challenge[key], normalized_challenge[key] + ): continue if key == "next_id": - if self._compare_challenge_next_id(challenge[key], normalized_challenge[key]): + if self._compare_challenge_next_id( + challenge[key], normalized_challenge[key] + ): continue + click.secho( + f"{key} comparison failed.", + fg="yellow", + ) + return False # Handle a special case for files, unless they are ignored @@ -948,27 +1113,47 @@ def verify(self, ignore: Tuple[str] = ()) -> bool: self._validate_files() local_files = {Path(f).name: f for f in challenge["files"]} except InvalidChallengeFile: + click.secho( + "InvalidChallengeFile", + fg="yellow", + ) return False remote_files = self._normalize_remote_files(remote_challenge["files"]) # Check if there are no extra local files for local_file in local_files: if local_file not in remote_files: + click.secho( + f"{local_file} is not in remote challenge.", + fg="yellow", + ) return False sha1sums = self._get_files_sha1sums() # Check if all remote files are present locally for remote_file_name in remote_files: if remote_file_name not in local_files: + click.secho( + f"{remote_file_namne} is not in local challenge.", + fg="yellow", + ) return False # sha1sum is present in CTFd 3.7+, use it instead of downloading the file if possible - remote_file_sha1sum = sha1sums[remote_files[remote_file_name]["location"]] + remote_file_sha1sum = sha1sums[ + remote_files[remote_file_name]["location"] + ] if remote_file_sha1sum is not None: - with open(self.challenge_directory / local_files[remote_file_name], "rb") as lf: + with open( + self.challenge_directory / local_files[remote_file_name], "rb" + ) as lf: local_file_sha1sum = hash_file(lf) if local_file_sha1sum != remote_file_sha1sum: + click.secho( + f"sha1sum does not match with remote one.", + fg="yellow", + ) return False return True @@ -977,9 +1162,15 @@ def verify(self, ignore: Tuple[str] = ()) -> bool: r = self.api.get(remote_files[remote_file_name]["url"]) r.raise_for_status() remote_file_contents = r.content - local_file_contents = (self.challenge_directory / local_files[remote_file_name]).read_bytes() + local_file_contents = ( + self.challenge_directory / local_files[remote_file_name] + ).read_bytes() if remote_file_contents != local_file_contents: + click.secho( + f"the file content does not match with the remote one.", + fg="yellow", + ) return False return True @@ -992,7 +1183,8 @@ def save(self): sorted_challenge_dict = { k: challenge_dict[k] for k in self.key_order - if k in challenge_dict and not self.is_default_challenge_property(k, challenge_dict[k]) + if k in challenge_dict + and not self.is_default_challenge_property(k, challenge_dict[k]) } # if there are any additional keys append them at the end @@ -1001,11 +1193,17 @@ def save(self): sorted_challenge_dict[k] = challenge_dict[k] try: - challenge_yml = yaml.safe_dump(sorted_challenge_dict, sort_keys=False, allow_unicode=True) + challenge_yml = yaml.safe_dump( + sorted_challenge_dict, sort_keys=False, allow_unicode=True + ) # attempt to pretty print the yaml (add an extra newline between selected top-level keys) - pattern = "|".join(r"^" + re.escape(key) + r":" for key in self.keys_with_newline) - pretty_challenge_yml = re.sub(pattern, r"\n\g<0>", challenge_yml, flags=re.MULTILINE) + pattern = "|".join( + r"^" + re.escape(key) + r":" for key in self.keys_with_newline + ) + pretty_challenge_yml = re.sub( + pattern, r"\n\g<0>", challenge_yml, flags=re.MULTILINE + ) with open(self.challenge_file_path, "w") as challenge_file: challenge_file.write(pretty_challenge_yml) diff --git a/ctfcli/spec/challenge-example.yml b/ctfcli/spec/challenge-example.yml index a303265..1fe96da 100644 --- a/ctfcli/spec/challenge-example.yml +++ b/ctfcli/spec/challenge-example.yml @@ -61,6 +61,7 @@ flags: - { type: "static", content: "flag{wat}", + data: "case_sensitive", } # A static case insensitive flag - { @@ -115,6 +116,13 @@ requirements: - "Warmup" - "Are you alive" +# Requirements has a flag named "anonymize". It displays anonymized +# challenge if it is set to true. Otherwise, the challenge will be +# hidden until a user satisfys the prerequisites. +# default is false. +# Can be removed if unused +require_anonymize: false + # The next_id is used to display a next recommended challenge to a user when # the user correctly answers the current challenge. # Can be removed if unused From adb2f3e7162db5b7e861070a93ab0afbbb968799 Mon Sep 17 00:00:00 2001 From: Hiroshi Suzuki <20717881+herosi@users.noreply.github.com> Date: Tue, 28 Jan 2025 22:17:37 +0900 Subject: [PATCH 3/4] Updated to pass tests --- tests/core/test_challenge.py | 57 +++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/tests/core/test_challenge.py b/tests/core/test_challenge.py index 261e3df..6c03c0b 100644 --- a/tests/core/test_challenge.py +++ b/tests/core/test_challenge.py @@ -125,7 +125,7 @@ def test_load_installed_challenge(self, mock_api: MagicMock): Challenge.load_installed_challenge(1) mock_get = mock_api.return_value.get - mock_get.assert_called_once_with("/api/v1/challenges/1") + mock_get.assert_called_once_with("/api/v1/challenges/1?view=admin") @mock.patch("ctfcli.core.challenge.API") def test_load_installed_challenges(self, mock_api: MagicMock): @@ -211,7 +211,7 @@ def test_updates_simple_properties(self, mock_api_constructor: MagicMock, *args, # expect GET calls loading existing resources to check if something needs to be deleted mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -223,6 +223,8 @@ def test_updates_simple_properties(self, mock_api_constructor: MagicMock, *args, [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), + call().raise_for_status(), call("/api/v1/challenges/1", json={"state": "visible"}), call().raise_for_status(), ] @@ -253,7 +255,7 @@ def test_updates_attempts(self, mock_api_constructor: MagicMock, *args, **kwargs mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -301,7 +303,7 @@ def test_updates_extra_properties(self, mock_api_constructor: MagicMock, *args, mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -373,7 +375,7 @@ def test_updates_flags(self, mock_api_constructor: MagicMock, *args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -455,7 +457,7 @@ def test_updates_topics(self, mock_api_constructor: MagicMock, *args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -516,7 +518,7 @@ def test_updates_tags(self, mock_api_constructor: MagicMock, *args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -570,7 +572,7 @@ def test_updates_files(self, mock_api_constructor: MagicMock, *args, **kwargs): def mock_get(*args, **kwargs): path = args[0] - if path == "/api/v1/challenges/1": + if path == "/api/v1/challenges/1" or path == "/api/v1/challenges/1?view=admin": mock_response = MagicMock() mock_response.json.return_value = {"success": True, "data": self.installed_challenges[0]} return mock_response @@ -595,7 +597,7 @@ def mock_get(*args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -682,7 +684,7 @@ def test_updates_hints(self, mock_api_constructor: MagicMock, *args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -736,7 +738,7 @@ def test_updates_requirements(self, mock_api_constructor: MagicMock, *args, **kw mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), ] ) @@ -745,7 +747,9 @@ def test_updates_requirements(self, mock_api_constructor: MagicMock, *args, **kw call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), # challenge 2 retrieved by name, and challenge 3 retrieved by id - call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2, 3]}}), + call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2, 3], "anonymize": False}}), + call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), call().raise_for_status(), ] ) @@ -796,7 +800,7 @@ def mock_get(*args, **kwargs): [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), - call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2]}}), + call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2], "anonymize": False}}), call().raise_for_status(), ], any_order=True, @@ -810,7 +814,7 @@ def mock_get(*args, **kwargs): mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), ] ) mock_api.post.assert_not_called() @@ -840,7 +844,7 @@ def test_defaults_to_standard_challenge_type(self, mock_api_constructor: MagicMo mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -880,7 +884,7 @@ def test_defaults_to_visible_state(self, mock_api_constructor: MagicMock, *args, mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -892,6 +896,8 @@ def test_defaults_to_visible_state(self, mock_api_constructor: MagicMock, *args, [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), + call().raise_for_status(), # this tests the real assigned state call("/api/v1/challenges/1", json={"state": "visible"}), call().raise_for_status(), @@ -988,7 +994,7 @@ def test_updates_multiple_attributes_at_once(self, mock_api_constructor: MagicMo mock_api.get.assert_has_calls( [ - call("/api/v1/challenges/1"), + call("/api/v1/challenges/1?view=admin"), call("/api/v1/flags"), call("/api/v1/challenges/1/topics"), call("/api/v1/tags"), @@ -1002,7 +1008,9 @@ def test_updates_multiple_attributes_at_once(self, mock_api_constructor: MagicMo [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), - call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2]}}), + call("/api/v1/challenges/1", json={"requirements": {"prerequisites": [2], "anonymize": False}}), + call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), call().raise_for_status(), call("/api/v1/challenges/1", json={"state": "visible"}), call().raise_for_status(), @@ -1122,6 +1130,8 @@ def test_does_not_update_ignored_attributes(self): [ call("/api/v1/challenges/1", json=expected_challenge_payload), call().raise_for_status(), + call("/api/v1/challenges/1", json={"next_id": None}), + call().raise_for_status(), call("/api/v1/challenges/1", json={"state": "visible"}), call().raise_for_status(), ] @@ -1234,7 +1244,9 @@ def mock_post(*args, **kwargs): mock_api.patch.assert_has_calls( [ - call("/api/v1/challenges/3", json={"requirements": {"prerequisites": [1, 2]}}), + call("/api/v1/challenges/3", json={"requirements": {"prerequisites": [1, 2], "anonymize": False}}), + call().raise_for_status(), + call("/api/v1/challenges/3", json={"next_id": None}), call().raise_for_status(), call("/api/v1/challenges/3", json={"state": "visible"}), call().raise_for_status(), @@ -1529,7 +1541,7 @@ def mock_get(self, *args, **kwargs): mock_response.json.return_value = {"success": True, "data": self.installed_challenges} return mock_response - if path == "/api/v1/challenges/3": + if path == "/api/v1/challenges/3" or path == "/api/v1/challenges/3?view=admin": mock_response = MagicMock() mock_response.json.return_value = { "success": True, @@ -1540,7 +1552,7 @@ def mock_get(self, *args, **kwargs): "description": "Test Description", "attribution": "Test Attribution", "connection_info": "https://example.com", - "next_id": None, + "next_id": 0, "category": "Test", "state": "visible", "max_attempts": 5, @@ -1733,6 +1745,8 @@ def test_normalize_fetches_and_normalizes_challenge(self, mock_api_constructor: "tags": ["tag-1", "tag-2"], "hints": ["free hint", {"content": "paid hint", "cost": 100}], "topics": ["topic-1", "topic-2"], + "next_id": None, + "require_anonymize": False, "requirements": ["First Test Challenge", "Other Test Challenge"], "extra": { "initial": 100, @@ -1753,6 +1767,7 @@ def test_verify_checks_if_challenge_is_the_same(self, mock_api_constructor: Magi # pop keys with default values to see if they are ignored for p in ["type", "state"]: challenge.pop(p) + print(challenge) challenge.challenge_id = 3 self.assertTrue(challenge.verify(ignore=["files"])) From 49991d850f53d1734acd6ae72ce2ffa5569d2faa Mon Sep 17 00:00:00 2001 From: Hiroshi Suzuki <20717881+herosi@users.noreply.github.com> Date: Tue, 28 Jan 2025 22:21:08 +0900 Subject: [PATCH 4/4] deleted debug print --- tests/core/test_challenge.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/core/test_challenge.py b/tests/core/test_challenge.py index 6c03c0b..7d36535 100644 --- a/tests/core/test_challenge.py +++ b/tests/core/test_challenge.py @@ -1767,7 +1767,6 @@ def test_verify_checks_if_challenge_is_the_same(self, mock_api_constructor: Magi # pop keys with default values to see if they are ignored for p in ["type", "state"]: challenge.pop(p) - print(challenge) challenge.challenge_id = 3 self.assertTrue(challenge.verify(ignore=["files"]))