From f54191b7746d24a79d6264accdba5ce641364b15 Mon Sep 17 00:00:00 2001 From: Gareth Latty Date: Sun, 28 Apr 2019 01:43:00 +0100 Subject: [PATCH] v2.0.0: ZiX-12B full support, fixes, restructure. --- .gitignore | 5 + README.md | 85 +++++++---- mypy.ini | 7 + setup.py | 26 ++++ unrpa | 300 ------------------------------------- unrpa/__init__.py | 221 +++++++++++++++++++++++++++ unrpa/__main__.py | 164 ++++++++++++++++++++ unrpa/errors.py | 59 ++++++++ unrpa/versions/__init__.py | 0 unrpa/versions/alt.py | 21 +++ unrpa/versions/errors.py | 21 +++ unrpa/versions/rpa.py | 41 +++++ unrpa/versions/version.py | 46 ++++++ unrpa/versions/zix.py | 124 +++++++++++++++ unrpa/view.py | 39 +++++ 15 files changed, 833 insertions(+), 326 deletions(-) create mode 100644 .gitignore create mode 100644 mypy.ini create mode 100644 setup.py delete mode 100755 unrpa create mode 100755 unrpa/__init__.py create mode 100644 unrpa/__main__.py create mode 100644 unrpa/errors.py create mode 100644 unrpa/versions/__init__.py create mode 100644 unrpa/versions/alt.py create mode 100644 unrpa/versions/errors.py create mode 100644 unrpa/versions/rpa.py create mode 100644 unrpa/versions/version.py create mode 100644 unrpa/versions/zix.py create mode 100644 unrpa/view.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a0f4ff0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +.idea +dist/ +build +*.egg-info \ No newline at end of file diff --git a/README.md b/README.md index 85d07f3..094d604 100644 --- a/README.md +++ b/README.md @@ -2,25 +2,64 @@ ## About -unrpa is a script to extract files from the RPA archive format created -for [the Ren'Py Visual Novel Engine](http://www.renpy.org/). +unrpa is a tool to extract files from the RPA archive format (from +[the Ren'Py Visual Novel Engine](http://www.renpy.org/). + +It can also be used as a library. + +## Installation + +### Package manager + +The best way to install unrpa is through your package manager, if a package is available for your operating system. +I maintain [an AUR package](https://aur.archlinux.org/packages/unrpa/)) for Arch Linux users. + +### pip + +You can also install unrpa through pip, the Python package manager. You can do this on Windows with: + + py -3 -m pip install "unrpa" + +Or use `python3` rather than `py -3` on unix systems. You can see +[the official documentation](https://packaging.python.org/tutorials/installing-packages/) for more help installing +through pip. + +### From source + +You can also [download the latest release](https://github.com/Lattyware/unrpa/releases/latest) +and extract it. ## Dependencies -You will need Python 3.4 or later in order to run it (either install through +You will need Python 3.7 or later in order to run it (either install through your package manager or [directly from python.org](https://www.python.org/downloads/)). -## Installation +If you are trying to extract more exotic RPA archives, there may be additional dependencies. unrpa should instruct +you how to install them if required. + +### Examples + +When installed through your package manager or pip, you should be able to use unrpa by opening a terminal or command +prompt and doing something like: + + unrpa -mp "path/to/output/dir" "path/to/archive.rpa" + +If you are running from source, you will need execute python directly: -You can [download the latest release](https://github.com/Lattyware/unrpa/releases/latest) -and then run the script as described below. + - On most unix systems, open a terminal in the directory containing unrpa then: + + python3 -m unrpa -mp "path/to/output/dir" "path/to/archive.rpa" + + - On most Windows systems, open a Command Prompt in the directory containing unrpa then: + + py -3 -m unrpa -mp "path\to\output\dir" "path\to\archive.rpa" ## Command Line Usage ``` usage: unrpa [-h] [-v] [-s] [-l] [-p PATH] [-m] [-f VERSION] - [--continue-on-error] + [--continue-on-error] [-o OFFSET] [-k KEY] [--version] FILENAME ``` @@ -30,22 +69,16 @@ usage: unrpa [-h] [-v] [-s] [-l] [-p PATH] [-m] [-f VERSION] |---------------------|--------------------------| | FILENAME | the RPA file to extract. | -| Optional Argument | Description | -|------------------------------|------------------------------------------------------------| -| -h, --help | show this help message and exit | -| -v, --verbose | explain what is being done [default]. | -| -s, --silent | no output. | -| -l, --list | only list contents, do not extract. | -| -p PATH, --path PATH | will extract to the given path. | -| -m, --mkdir | will make any non-existent directories in extraction path. | -| -f VERSION, --force VERSION | forces an archive version. May result in failure. | -| --continue-on-error | try to continue extraction when something goes wrong. | - -### Examples - - - On most unix systems, open a terminal, then: - `python3 unrpa -mp "path/to/output/dir" "path/to/archive.rpa"` - - On most Windows systems, open a Command Prompt, then: - `py -3 unrpa -mp "path\to\output\dir" "path\to\archive.rpa"` - - +| Optional Argument | Description | +|------------------------------|----------------------------------------------------------------| +| -h, --help | show this help message and exit | +| -v, --verbose | explain what is being done [default]. | +| -s, --silent | no output. | +| -l, --list | only list contents, do not extract. | +| -p PATH, --path PATH | will extract to the given path. | +| -m, --mkdir | will make any non-existent directories in extraction path. | +| -f VERSION, --force VERSION | forces an archive version. May result in failure.
Possible versions: RPA-3.0, ZiX-12B, ALT-1.0, RPA-2.0, RPA-1.0. | +| --continue-on-error | try to continue extraction when something goes wrong. | +| -o OFFSET, --offset OFFSET | sets an offset to be used to decode unsupported archives. | +| -k KEY, --key KEY | sets a key to be used to decode unsupported archives. | +| --version | show program's version number and exit | diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..0983e4a --- /dev/null +++ b/mypy.ini @@ -0,0 +1,7 @@ +[mypy] +python_version = 3.7 +warn_unused_configs = True +disallow_untyped_defs = True +disallow_incomplete_defs = True +disallow_any_generics = True +strict_optional = True diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..06a89bf --- /dev/null +++ b/setup.py @@ -0,0 +1,26 @@ +import setuptools # type: ignore + +with open("README.md", "r") as readme: + long_description = readme.read() + +setuptools.setup( + name="unrpa", + version="2.0.0", + author="Gareth Latty", + author_email="gareth@lattyware.co.uk", + description="Extract files from the RPA archive format (from the Ren'Py Visual Novel Engine).", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/Lattyware/unrpa", + packages=setuptools.find_packages(), + python_requires=">=3.7", + keywords="renpy rpa archive extract", + classifiers=[ + "Topic :: System :: Archiving", + "Programming Language :: Python :: 3.7", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Operating System :: OS Independent", + "Environment :: Console", + ], + entry_points={"console_scripts": ["unrpa = unrpa:__main__"]}, +) diff --git a/unrpa b/unrpa deleted file mode 100755 index c8b0c1b..0000000 --- a/unrpa +++ /dev/null @@ -1,300 +0,0 @@ -#!/usr/bin/env python3 - -""" -unrpa is a tool to extract files from Ren'Py archives (.rpa). - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . -""" - -import os -import argparse -import sys -import pickle -import zlib -import traceback - - -class Version: - def __init__(self, name): - self.name = name - - def find_offset_and_key(self, file): - raise NotImplementedError() - - def detect(self, extension, first_line): - raise NotImplementedError() - - def __str__(self): - return self.name - - -class RPA1(Version): - def __init__(self): - super().__init__("RPA-1.0") - - def detect(self, extension, first_line): - return extension == ".rpi" - - def find_offset_and_key(self, file): - return 0, None - - -class HeaderBasedVersion(Version): - def __init__(self, name, header): - super().__init__(name) - self.header = header - - def find_offset_and_key(self, file): - raise NotImplementedError() - - def detect(self, extension, first_line): - return first_line.startswith(self.header) - - -class RPA2(HeaderBasedVersion): - def __init__(self): - super().__init__("RPA-2.0", b"RPA-2.0") - - def find_offset_and_key(self, file): - offset = int(file.readline()[8:], 16) - return offset, None - - -class RPA3(HeaderBasedVersion): - def __init__(self): - super().__init__("RPA-3.0", b"RPA-3.0") - - def find_offset_and_key(self, file): - line = file.readline() - parts = line.split() - offset = int(parts[1], 16) - key = int(parts[2], 16) - return offset, key - - -class ALT1(HeaderBasedVersion): - EXTRA_KEY = 0xDABE8DF0 - - def __init__(self): - super().__init__("ALT-1.0", b"ALT-1.0") - - def find_offset_and_key(self, file): - line = file.readline() - parts = line.split() - key = int(parts[1], 16) ^ ALT1.EXTRA_KEY - offset = int(parts[2], 16) - return offset, key - - -class ZiX(HeaderBasedVersion): - def __init__(self): - super().__init__("ZiX-12B", b"ZiX-12B") - - def find_offset_and_key(self, file): - # TODO: see https://github.com/Lattyware/unrpa/issues/15 - raise NotImplementedError() - - -RPA1 = RPA1() -RPA2 = RPA2() -RPA3 = RPA3() -ALT1 = ALT1() -ZiX = ZiX() -Versions = [RPA1, RPA2, RPA3, ALT1, ZiX] - - -class UnRPA: - NAME = "unrpa" - - def __init__(self, filename, verbosity=1, path=None, mkdir=False, version=None, continue_on_error=False, - offset_and_key=None): - self.verbose = verbosity - if path: - self.path = os.path.abspath(path) - else: - self.path = os.getcwd() - self.mkdir = mkdir - self.version = version - self.archive = filename - self.continue_on_error = continue_on_error - self.offset_and_key = offset_and_key - self.tty = sys.stdout.isatty() - - def log(self, verbosity, message): - if self.tty and self.verbose > verbosity: - print("{}: {}".format(UnRPA.NAME, message)) - - def log_tty(self, message): - if not self.tty and self.verbose > 1: - print(message) - - def exit(self, message): - sys.exit("{}: error: {}".format(UnRPA.NAME, message)) - - def extract_files(self): - self.log(0, "extracting files.") - if self.mkdir: - self.make_directory_structure(self.path) - if not os.path.isdir(self.path): - self.exit("path doesn't exist, if you want to create it, use -m.") - - index = self.get_index() - total_files = len(index) - for file_number, (path, data) in enumerate(index.items()): - try: - self.make_directory_structure(os.path.join(self.path, os.path.split(path)[0])) - raw_file = self.extract_file(path, data, file_number, total_files) - with open(os.path.join(self.path, path), "wb") as f: - f.write(raw_file) - except BaseException as e: - if self.continue_on_error: - traceback.print_exc() - self.log(0, - "error extracting (see above), but --continue-on-error was used, so we will keep going.") - else: - raise Exception("There was an error while trying to extract a file. See the nested exception for " - "more. If you wish to try and extract as much from the archive as possible, please " - "use the --continue-on-error flag.") from e - - def list_files(self): - self.log(1, "listing files:") - paths = self.get_index().keys() - for path in sorted(paths): - print(path) - - def extract_file(self, name, data, file_number, total_files): - self.log(1, "[{:04.2%}] {:>3}".format(file_number / float(total_files), name)) - self.log_tty(name) - offset, dlen, start = data[0] - with open(self.archive, "rb") as f: - f.seek(offset) - raw_file = start + f.read(dlen - len(start)) - return raw_file - - def make_directory_structure(self, name): - self.log(2, "creating directory structure: {}".format(name)) - if not os.path.exists(name): - os.makedirs(name) - - def get_index(self): - if not self.version: - self.version = self.detect_version() - - if self.version == ZiX and (not self.offset_and_key): - self.exit("This archive uses the ZiX-12B obfuscation scheme, which is non-standard and not currently " - "supported by unrpa. Please see https://github.com/Lattyware/unrpa/issues/15 for more details.") - elif not self.version: - self.exit("This archive doesn't have a header we recognise, if you know the version of the archive you can " - "try using -f to extract it without the header.") - - with open(self.archive, "rb") as f: - if self.offset_and_key: - offset, key = self.offset_and_key - else: - offset, key = self.version.find_offset_and_key(f) - f.seek(offset) - index = pickle.loads(zlib.decompress(f.read()), encoding="bytes") - if key is not None: - index = self.deobfuscate_index(index, key) - - return {self.ensure_str_path(path).replace("/", os.sep): data for path, data in index.items()} - - def ensure_str_path(self, key): - try: - return key.decode("utf-8") - except AttributeError: - return key - - def detect_version(self): - ext = os.path.splitext(self.archive)[1].lower() - with open(self.archive, "rb") as f: - line = f.readline() - for version in Versions: - if version.detect(ext, line): - return version - return None - - def deobfuscate_index(self, index, key): - return {k: self.deobfuscate_entry(key, v) for k, v in index.items()} - - def deobfuscate_entry(self, key, entry): - if len(entry[0]) == 2: - entry = ((offset, dlen, b"") for offset, dlen in entry) - return [(offset ^ key, dlen ^ key, start) for offset, dlen, start in entry] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Extract files from the RPA archive format.") - - parser.add_argument("-v", "--verbose", action="count", dest="verbose", default=1, - help="explain what is being done [default].") - parser.add_argument("-s", "--silent", action="store_const", const=0, dest="verbose", - help="no output.") - parser.add_argument("-l", "--list", action="store_true", dest="list", default=False, - help="only list contents, do not extract.") - parser.add_argument("-p", "--path", action="store", type=str, dest="path", default=None, - help="will extract to the given path.") - parser.add_argument("-m", "--mkdir", action="store_true", dest="mkdir", default=False, - help="will make any non-existent directories in extraction path.") - parser.add_argument("-f", "--force", action="store", type=str, dest="version", default=None, - help="forces an archive version. May result in failure. Possible versions: " - + ", ".join(str(version) for version in Versions)) - parser.add_argument("--continue-on-error", action="store_true", dest="continue_on_error", default=False, - help="try to continue extraction when something goes wrong.") - parser.add_argument("-o", "--offset", action="store", type=int, dest="offset", default=None, - help="sets an offset to be used to decode ZiX-12B archives.") - parser.add_argument("-k", "--key", action="store", type=int, dest="key", default=None, - help="sets a key to be used to decode ZiX-12B archives.") - - parser.add_argument("filename", metavar="FILENAME", type=str, help="the RPA file to extract.") - - args = parser.parse_args() - - provided_version = None - if args.version: - for version in Versions: - if args.version.lower() == version.name.lower(): - provided_version = version - break - else: - parser.error("The archive version you gave isn't one we recognise - it needs to be one of: " + - ", ".join(str(version) for version in Versions)) - - provided_offset_and_key = None - if args.key and args.offset: - provided_offset_and_key = (args.offset, args.key) - if bool(args.key) != bool(args.offset): - parser.error("If you set a key or offset, you must set both.") - - if args.list and args.path: - parser.error("option -path: only valid when extracting.") - - if args.mkdir and not args.path: - parser.error("option --mkdir: only valid when --path is set.") - - if not args.mkdir and args.path and not os.path.isdir(args.path): - parser.error("No such directory: '{}'. Use --mkdir to create it.".format(args.path)) - - if args.list and args.verbose == 0: - parser.error("option --list: can't be silent while listing data.") - - if not os.path.isfile(args.filename): - parser.error("No such file: '{}'.".format(args.filename)) - - extractor = UnRPA(args.filename, args.verbose, args.path, args.mkdir, provided_version, args.continue_on_error, - provided_offset_and_key) - if args.list: - extractor.list_files() - else: - extractor.extract_files() diff --git a/unrpa/__init__.py b/unrpa/__init__.py new file mode 100755 index 0000000..591cf86 --- /dev/null +++ b/unrpa/__init__.py @@ -0,0 +1,221 @@ +import io +import os +import pickle +import sys +import traceback +import zlib +from typing import ( + Union, + Tuple, + Optional, + Dict, + cast, + Iterable, + Type, + BinaryIO, + FrozenSet, +) + +from unrpa.errors import ( + OutputDirectoryNotFoundError, + ErrorExtractingFile, + AmbiguousArchiveError, + UnknownArchiveError, +) +from unrpa.versions import rpa, alt, zix +from unrpa.versions.version import Version +from unrpa.view import ArchiveView + +# Offset, Length +SimpleIndexPart = Tuple[int, int] +SimpleIndexEntry = Iterable[SimpleIndexPart] +# Offset, Length, Prefix +ComplexIndexPart = Tuple[int, int, bytes] +ComplexIndexEntry = Iterable[ComplexIndexPart] +IndexPart = Union[SimpleIndexPart, ComplexIndexPart] +IndexEntry = Iterable[IndexPart] + + +class UnRPA: + """Extraction tool for RPA archives.""" + + name = "unrpa" + + error = 0 + info = 1 + debug = 2 + + provided_versions: FrozenSet[Type[Version]] = frozenset( + {*rpa.versions, *alt.versions, *zix.versions} + ) + + def __init__( + self, + filename: str, + verbosity: int = -1, + path: Optional[str] = None, + mkdir: bool = False, + version: Optional[Type[Version]] = None, + continue_on_error: bool = False, + offset_and_key: Optional[Tuple[int, int]] = None, + extra_versions: FrozenSet[Type[Version]] = frozenset(), + ) -> None: + self.verbose = verbosity + if path: + self.path = os.path.abspath(path) + else: + self.path = os.getcwd() + self.mkdir = mkdir + self.version = version + self.archive = filename + self.continue_on_error = continue_on_error + self.offset_and_key = offset_and_key + self.tty = sys.stdout.isatty() + self.versions = UnRPA.provided_versions | extra_versions + + def log( + self, verbosity: int, human_message: str, machine_message: str = None + ) -> None: + if self.tty and self.verbose > verbosity: + print( + human_message if self.tty else machine_message, + file=sys.stderr if verbosity == UnRPA.error else sys.stdout, + ) + + def extract_files(self) -> None: + self.log(UnRPA.error, "Extracting files.") + if self.mkdir: + self.make_directory_structure(self.path) + if not os.path.isdir(self.path): + raise OutputDirectoryNotFoundError(self.path) + + version = self.version() if self.version else self.detect_version() + + with open(self.archive, "rb") as archive: + index = self.get_index(archive, version) + total_files = len(index) + for file_number, (path, data) in enumerate(index.items()): + try: + self.make_directory_structure( + os.path.join(self.path, os.path.split(path)[0]) + ) + file_view = self.extract_file( + path, + data, + file_number, + total_files, + cast(io.BufferedReader, archive), + ) + with open(os.path.join(self.path, path), "wb") as output_file: + version.postprocess(file_view, output_file) + except BaseException as error: + if self.continue_on_error: + self.log( + 0, + f"Error extracting from the archive, but directed to continue on error. Detail: " + f"{traceback.format_exc()}.", + ) + else: + raise ErrorExtractingFile(traceback.format_exc()) from error + + def list_files(self) -> None: + self.log(UnRPA.info, "Listing files:") + with open(self.archive, "rb") as archive: + paths = self.get_index(archive).keys() + for path in sorted(paths): + print(path) + + def extract_file( + self, + name: str, + data: ComplexIndexEntry, + file_number: int, + total_files: int, + archive: io.BufferedIOBase, + ) -> ArchiveView: + self.log( + UnRPA.info, f"[{file_number / float(total_files):04.2%}] {name:>3}", name + ) + offset, length, start = next(iter(data)) + return ArchiveView(archive, offset, length, start) + + def make_directory_structure(self, name: str) -> None: + self.log(UnRPA.debug, f"Creating directory structure: {name}") + if not os.path.exists(name): + os.makedirs(name) + + def get_index( + self, archive: BinaryIO, version: Optional[Version] = None + ) -> Dict[str, ComplexIndexEntry]: + if not version: + version = self.version() if self.version else self.detect_version() + + offset = 0 + key: Optional[int] = None + if self.offset_and_key: + offset, key = self.offset_and_key + else: + offset, key = version.find_offset_and_key(archive) + archive.seek(offset) + index: Dict[bytes, IndexEntry] = pickle.loads( + zlib.decompress(archive.read()), encoding="bytes" + ) + if key is not None: + normal_index = UnRPA.deobfuscate_index(key, index) + else: + normal_index = UnRPA.normalise_index(index) + + return { + UnRPA.ensure_str_path(path).replace("/", os.sep): data + for path, data in normal_index.items() + } + + def detect_version(self) -> Version: + potential = (version() for version in self.versions) + ext = os.path.splitext(self.archive)[1].lower() + with open(self.archive, "rb") as f: + header = f.readline() + detected = {version for version in potential if version.detect(ext, header)} + if len(detected) > 1: + raise AmbiguousArchiveError(detected) + try: + return next(iter(detected)) + except StopIteration: + raise UnknownArchiveError(header) + + @staticmethod + def ensure_str_path(path: Union[str, bytes]) -> str: + if isinstance(path, str): + return path + else: + return path.decode("utf-8", "replace") + + @staticmethod + def deobfuscate_index( + key: int, index: Dict[bytes, IndexEntry] + ) -> Dict[bytes, ComplexIndexEntry]: + return { + path: UnRPA.deobfuscate_entry(key, entry) for path, entry in index.items() + } + + @staticmethod + def deobfuscate_entry(key: int, entry: IndexEntry) -> ComplexIndexEntry: + return [ + (offset ^ key, length ^ key, start) + for offset, length, start in UnRPA.normalise_entry(entry) + ] + + @staticmethod + def normalise_index( + index: Dict[bytes, IndexEntry] + ) -> Dict[bytes, ComplexIndexEntry]: + return {path: UnRPA.normalise_entry(entry) for path, entry in index.items()} + + @staticmethod + def normalise_entry(entry: IndexEntry) -> ComplexIndexEntry: + return [ + (*cast(SimpleIndexPart, part), b"") + if len(part) == 2 + else cast(ComplexIndexPart, part) + for part in entry + ] diff --git a/unrpa/__main__.py b/unrpa/__main__.py new file mode 100644 index 0000000..9252842 --- /dev/null +++ b/unrpa/__main__.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 + +""" +unrpa is a tool to extract files from Ren'Py archives (.rpa). + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +import argparse +import os +import sys +from typing import Tuple, Optional, Any + +from unrpa import UnRPA +from unrpa.errors import UnRPAError + +parser = argparse.ArgumentParser( + prog="unrpa", + description="Extract files from the RPA archive format (from the Ren'Py Visual Novel Engine).", +) + +parser.add_argument( + "-v", + "--verbose", + action="count", + dest="verbose", + default=1, + help="explain what is being done [default].", +) +parser.add_argument( + "-s", "--silent", action="store_const", const=0, dest="verbose", help="no output." +) +parser.add_argument( + "-l", + "--list", + action="store_true", + dest="list", + default=False, + help="only list contents, do not extract.", +) +parser.add_argument( + "-p", + "--path", + action="store", + type=str, + dest="path", + default=None, + help="will extract to the given path.", +) +parser.add_argument( + "-m", + "--mkdir", + action="store_true", + dest="mkdir", + default=False, + help="will make any non-existent directories in extraction path.", +) +parser.add_argument( + "-f", + "--force", + action="store", + type=str, + dest="version", + default=None, + help="forces an archive version. May result in failure. Possible versions: " + + ", ".join(version.name for version in UnRPA.provided_versions) + + ".", +) +parser.add_argument( + "--continue-on-error", + action="store_true", + dest="continue_on_error", + default=False, + help="try to continue extraction when something goes wrong.", +) +parser.add_argument( + "-o", + "--offset", + action="store", + type=int, + dest="offset", + default=None, + help="sets an offset to be used to decode unsupported archives.", +) +parser.add_argument( + "-k", + "--key", + action="store", + type=int, + dest="key", + default=None, + help="sets a key to be used to decode unsupported archives.", +) + +parser.add_argument("--version", action="version", version="%(prog)s 2.0.0") + +parser.add_argument( + "filename", metavar="FILENAME", type=str, help="the RPA file to extract." +) + +args: Any = parser.parse_args() + +provided_version = None +if args.version: + try: + provided_version = next( + version + for version in UnRPA.provided_versions + if args.version.lower() == version.name.lower() + ) + except StopIteration: + parser.error( + "The archive version you gave isn’t one we recognise - it needs to be one of: " + + ", ".join(version.name for version in UnRPA.provided_versions) + ) + +provided_offset_and_key: Optional[Tuple[int, int]] = None +if args.key and args.offset: + provided_offset_and_key = (args.offset, args.key) +elif bool(args.key) != bool(args.offset): + parser.error("If you set --key or --offset, you must set both.") + +if args.list and args.path: + parser.error("Option -path: only valid when extracting.") + +if args.mkdir and not args.path: + parser.error("Option --mkdir: only valid when --path is set.") + +if not args.mkdir and args.path and not os.path.isdir(args.path): + parser.error(f"No such directory: “{args.path}”. Use --mkdir to create it.") + +if args.list and args.verbose == 0: + parser.error("Option --list: can’t be silent while listing data.") + +if not os.path.isfile(args.filename): + parser.error(f"No such file: “{args.filename}”.") + +try: + extractor = UnRPA( + args.filename, + args.verbose, + args.path, + args.mkdir, + provided_version, + args.continue_on_error, + provided_offset_and_key, + ) + if args.list: + extractor.list_files() + else: + extractor.extract_files() +except UnRPAError as error: + sys.exit(f"\n\033[31m{error.message}\n{error.cmd_line_help}\033[30m") diff --git a/unrpa/errors.py b/unrpa/errors.py new file mode 100644 index 0000000..e224494 --- /dev/null +++ b/unrpa/errors.py @@ -0,0 +1,59 @@ +from typing import Set, Optional + +from unrpa.versions.version import Version + + +class UnRPAError(Exception): + """Any error specific to unrpa.""" + + def __init__(self, message: str, cmd_line_help: Optional[str] = None): + self.message = message + self.cmd_line_help = cmd_line_help + super().__init__(message) + + +class OutputDirectoryNotFoundError(UnRPAError): + """An error for when the given output directory doesn’t exist.""" + + def __init__(self, path: str) -> None: + super().__init__( + f"The given output directory ({path}) does not exist.", + "If you want to create it, use --mkdir.", + ) + + +class UnknownArchiveError(UnRPAError): + """An error for when auto-detection of archive version gives no result.""" + + def __init__(self, header: bytes) -> None: + self.header = header + decoded = header.decode("utf-8", "replace") + super().__init__( + "Auto-detection of the version for this archived failed—it is likely this archive is a version not " + f"supported. Try updating unrpa, or submitting a bug report. Header: “{decoded.strip()}”", + "You can try using --force to force a specific version rather than relying on auto-detection.", + ) + + +class AmbiguousArchiveError(UnRPAError): + """An error for when auto-detection of archive version gives an ambiguous result.""" + + def __init__(self, detected: Set[Version]) -> None: + self.versions = detected + detected_list = ", ".join(str(version) for version in detected) + super().__init__( + f"Auto-detection of the version for this archive failed because it is ambiguous. It could be any one of: " + f"{detected_list}.", + "You can try using --force to force these versions and see what works.", + ) + + +class ErrorExtractingFile(UnRPAError): + """A wrapping error for when something goes wrong while extracting a file.""" + + def __init__(self, detail: str) -> None: + super().__init__( + "There was an error while trying to extract a file from the archive.", + "If you wish to try and extract as much from the archive as possible, please use --continue-on-error.\n" + f"Error Detail: {detail}", + ) diff --git a/unrpa/versions/__init__.py b/unrpa/versions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/unrpa/versions/alt.py b/unrpa/versions/alt.py new file mode 100644 index 0000000..606a03f --- /dev/null +++ b/unrpa/versions/alt.py @@ -0,0 +1,21 @@ +from typing import BinaryIO, Tuple, Optional, FrozenSet, Type + +from unrpa.versions.version import HeaderBasedVersion, Version + + +class ALT1(HeaderBasedVersion): + """A short-lived alternative version of RPA-3.0 from mainline Ren'Py.""" + + name = "ALT-1.0" + header = b"ALT-1.0" + extra_key = 0xDABE8DF0 + + def find_offset_and_key(self, archive: BinaryIO) -> Tuple[int, Optional[int]]: + line = archive.readline() + parts = line.split() + key = int(parts[1], 16) ^ ALT1.extra_key + offset = int(parts[2], 16) + return offset, key + + +versions: FrozenSet[Type[Version]] = frozenset({ALT1}) diff --git a/unrpa/versions/errors.py b/unrpa/versions/errors.py new file mode 100644 index 0000000..3747c5a --- /dev/null +++ b/unrpa/versions/errors.py @@ -0,0 +1,21 @@ +from typing import Optional + +from unrpa.errors import UnRPAError + + +class VersionSpecificRequirementUnmetError(UnRPAError): + """An error where the version of the archive has a special need that is unmet.""" + + def __init__(self, message: str, cmd_line_help: Optional[str] = None) -> None: + super().__init__(message, cmd_line_help) + + +class MissingPackageError(VersionSpecificRequirementUnmetError): + """An error where the version of the archive requires a Python package that isn't installed.""" + + def __init__(self, package: str) -> None: + super().__init__( + f"Extracting from this archive requires the package “{package}”.", + f'You can do this by running “pip install "{package}"”. See ' + f"https://packaging.python.org/tutorials/installing-packages for more help on installing python packages.", + ) diff --git a/unrpa/versions/rpa.py b/unrpa/versions/rpa.py new file mode 100644 index 0000000..8e42901 --- /dev/null +++ b/unrpa/versions/rpa.py @@ -0,0 +1,41 @@ +from typing import FrozenSet, BinaryIO, Tuple, Optional, Type + +from unrpa.versions.version import ExtensionBasedVersion, HeaderBasedVersion, Version + + +class RPA1(ExtensionBasedVersion): + """The first official version of the RPA format.""" + + name = "RPA-1.0" + extension = ".rpi" + + def find_offset_and_key(self, archive: BinaryIO) -> Tuple[int, Optional[int]]: + return 0, None + + +class RPA2(HeaderBasedVersion): + """The second official version of the RPA format.""" + + name = "RPA-2.0" + header = b"RPA-2.0" + + def find_offset_and_key(self, archive: BinaryIO) -> Tuple[int, Optional[int]]: + offset = int(archive.readline()[8:], 16) + return offset, None + + +class RPA3(HeaderBasedVersion): + """The third official version of the RPA format.""" + + name = "RPA-3.0" + header = b"RPA-3.0" + + def find_offset_and_key(self, archive: BinaryIO) -> Tuple[int, Optional[int]]: + line = archive.readline() + parts = line.split() + offset = int(parts[1], 16) + key = int(parts[2], 16) + return offset, key + + +versions: FrozenSet[Type[Version]] = frozenset({RPA1, RPA2, RPA3}) diff --git a/unrpa/versions/version.py b/unrpa/versions/version.py new file mode 100644 index 0000000..4890817 --- /dev/null +++ b/unrpa/versions/version.py @@ -0,0 +1,46 @@ +from abc import ABCMeta, abstractmethod +from typing import Tuple, Optional, BinaryIO + +from unrpa.view import ArchiveView + + +class Version(metaclass=ABCMeta): + """An abstract base class for parsing different versions of RPA archive.""" + + name: str + + @abstractmethod + def detect(self, extension: str, first_line: bytes) -> bool: + """Detect if an archive is of this version.""" + raise NotImplementedError() + + @abstractmethod + def find_offset_and_key(self, archive: BinaryIO) -> Tuple[int, Optional[int]]: + """Find the offset and key values for the archive.""" + raise NotImplementedError() + + def postprocess(self, source: ArchiveView, sink: BinaryIO) -> None: + """Allows postprocessing over the data extracted from the archive.""" + for segment in iter(source.read1, b""): + sink.write(segment) + + def __str__(self) -> str: + return self.name + + +class ExtensionBasedVersion(Version, metaclass=ABCMeta): + """A helper for versions where detection is based on the file extension.""" + + extension: str + + def detect(self, extension: str, first_line: bytes) -> bool: + return extension == self.extension + + +class HeaderBasedVersion(Version, metaclass=ABCMeta): + """A helper for versions where detection is based on an in-file header.""" + + header: bytes + + def detect(self, extension: str, first_line: bytes) -> bool: + return first_line.startswith(self.header) diff --git a/unrpa/versions/zix.py b/unrpa/versions/zix.py new file mode 100644 index 0000000..6cb1731 --- /dev/null +++ b/unrpa/versions/zix.py @@ -0,0 +1,124 @@ +import io +import os +import re +import struct +import itertools +from typing import BinaryIO, Tuple, Optional, FrozenSet, Type + +from unrpa.versions.errors import ( + VersionSpecificRequirementUnmetError, + MissingPackageError, +) +from unrpa.versions.version import HeaderBasedVersion, Version +from unrpa.view import ArchiveView + + +class ZiX12B(HeaderBasedVersion): + """A proprietary format with additional obfuscation.""" + + name = "ZiX-12B" + header = b"ZiX-12B" + + magic_constant = 102464652121606009 + magic_keys = ( + 3621826839565189698, + 8167163782024462963, + 5643161164948769306, + 4940859562182903807, + 2672489546482320731, + 8917212212349173728, + 7093854916990953299, + ) + + loader = "loader.pyo" + + struct_format = " None: + self.key: Optional[int] = None + + def find_offset_and_key(self, archive: BinaryIO) -> Tuple[int, Optional[int]]: + path = os.path.join(os.path.dirname(archive.name), ZiX12B.loader) + try: + import uncompyle6 # type: ignore + except ImportError as e: + raise MissingPackageError("uncompyle6") from e + try: + with io.StringIO() as decompiled: + uncompyle6.decompile_file(path, outstream=decompiled) + match = re.search( + r"verificationcode = _string.sha1\('(.*)'\)", decompiled.getvalue() + ) + if match: + verification_code = match.group(1) + else: + raise IncorrectLoaderError() + except ImportError as e: + raise LoaderRequiredError(path) from e + parts = archive.readline().split() + self.key = ZiX12B.sha1(verification_code) + return ZiX12B.offset(parts[-1]), self.key + + def postprocess(self, source: ArchiveView, sink: BinaryIO) -> None: + """Allows postprocessing over the data extracted from the archive.""" + if self.key: + parts = [] + amount = ZiX12B.obfuscated_amount + while amount > 0: + part = source.read(amount) + amount -= len(part) + parts.append(part) + sink.write(ZiX12B.run(b"".join(parts), self.key)) + else: + raise Exception("find_offset_and_key must be called before postprocess") + for segment in iter(source.read1, b""): + sink.write(segment) + + # The following code is reverse engineered from the cython "_string.pyd" file courtesy of omegalink12. + # https://github.com/Lattyware/unrpa/issues/15#issuecomment-485014225 + + @staticmethod + def sha1(code: str) -> int: + a = int("".join(filter(str.isdigit, code))) + ZiX12B.magic_constant + b = round(a ** (1 / 3)) / 23 * 109 + return int(b) + + @staticmethod + def offset(value: bytes) -> int: + a = value[7:5:-1] + b = value[:3] + c = value[5:2:-1] + return int(a + b + c, 16) + + @staticmethod + def run(s: bytes, key: int) -> bytes: + encoded = struct.unpack(ZiX12B.struct_format, s) + decoded = ( + magic_key ^ key ^ part + for (magic_key, part) in zip(itertools.cycle(ZiX12B.magic_keys), encoded) + ) + return struct.pack(ZiX12B.struct_format, *decoded) + + +versions: FrozenSet[Type[Version]] = frozenset({ZiX12B}) + + +class LoaderRequiredError(VersionSpecificRequirementUnmetError): + """An error where the user needs to provide `loader.pyo` to extract this type of archive.""" + + def __init__(self, path: str) -> None: + super().__init__( + f"To extract {ZiX12B.name} archives, the “{ZiX12B.loader}” file is required alongside the archive (we " + f"looked for it at “{path}”). You can find this file in the game you got the archive from, in the “renpy” " + f"directory.", + f"Copy the “{ZiX12B.loader}” file next to the archive you are trying to extract.", + ) + + +class IncorrectLoaderError(VersionSpecificRequirementUnmetError): + def __init__(self) -> None: + super().__init__( + "The provided “{ZiX12B.loader}” file does not appear to be the correct one. Please check it is from the " + "game this archive came from." + ) diff --git a/unrpa/view.py b/unrpa/view.py new file mode 100644 index 0000000..f958cf4 --- /dev/null +++ b/unrpa/view.py @@ -0,0 +1,39 @@ +import io +from typing import cast, Callable + + +class ArchiveView: + """A file-like object that just passes through to the underlying file.""" + + def __init__( + self, archive: io.BufferedIOBase, offset: int, length: int, prefix: bytes + ): + archive.seek(offset) + self.remaining = length + self.sources = [archive] + if prefix: + self.sources.insert(0, cast(io.BufferedIOBase, io.BytesIO(prefix))) + + def read(self, amount: int = -1) -> bytes: + return self.base_read(lambda source: source.read, amount) + + def read1(self, amount: int = -1) -> bytes: + return self.base_read(lambda source: source.read1, amount) + + def base_read( + self, method: Callable[[io.BufferedIOBase], Callable[[int], bytes]], amount: int + ) -> bytes: + if amount < 0 or amount > self.remaining: + amount = self.remaining + if self.sources and self.remaining > 0: + segment = method(self.sources[0])(amount) + if segment: + self.remaining -= len(segment) + return segment + else: + self.sources.pop(0) + return self.base_read(method, amount) + else: + if self.remaining != 0: + raise Exception("End of archive reached before the file should end.") + return b""