summaryrefslogtreecommitdiff
path: root/src/xpit_
diff options
context:
space:
mode:
Diffstat (limited to 'src/xpit_')
-rw-r--r--src/xpit_/args_helpers.py102
-rw-r--r--src/xpit_/create.py263
-rw-r--r--src/xpit_/error.py4
-rw-r--r--src/xpit_/identity.py59
-rw-r--r--src/xpit_/init.py54
-rw-r--r--src/xpit_/issue.py168
-rw-r--r--src/xpit_/issue_tracker.py52
-rw-r--r--src/xpit_/list_.py104
-rw-r--r--src/xpit_/xpit.py53
9 files changed, 859 insertions, 0 deletions
diff --git a/src/xpit_/args_helpers.py b/src/xpit_/args_helpers.py
new file mode 100644
index 0000000..1e79911
--- /dev/null
+++ b/src/xpit_/args_helpers.py
@@ -0,0 +1,102 @@
+from sys import stderr
+from . import error
+Err = error.Err
+
+def print_error(msg: str) -> None:
+ stderr.write(f"{msg}\n")
+
+def err_arg_unrecognized(arg: str) -> None:
+ print_error(f"Unrecognized argument: {arg}")
+
+ERR_UNRECOGNIZED_ARG = "ERR_UNRECOGNIZED_ARG"
+ERR_NOT_ENOUGH_ARGS = "ERR_NOT_ENOUGH_ARGS"
+
+def parse_arg(
+ argv: list[str],
+ i: int,
+ flag_value_map: dict[str, int]
+) -> tuple[str, list[str], int, Err | None]:
+ def success(
+ flag: str,
+ values: list[str],
+ new_i: int
+ ) -> tuple[str, list[str], int, Err | None]:
+ return flag, values, new_i, None
+ def fail(msg: str) -> tuple[str, list[str], int, Err | None]:
+ return "", [], 0, Err(msg)
+
+ flag: str
+ values: list[str] = []
+ arg: str = argv[i]
+ i += 1
+ if arg[0] != '-':
+ err_arg_unrecognized(arg)
+ return fail(ERR_UNRECOGNIZED_ARG)
+
+ value_index = 0
+ if arg[1] == '-':
+ # --flag
+ delimiter_index = arg.find('=')
+ if delimiter_index == -1:
+ flag = arg[2:]
+ value_index = 0
+ else:
+ flag = arg[2:delimiter_index]
+ values.append(arg[delimiter_index+1:])
+ value_index = 1
+
+ else:
+ # -f
+ flag = arg[1]
+ value_index = 0
+
+ if len(arg) != 2:
+ value = arg[2:]
+ if value[0] == '=':
+ value = value[1:]
+ values.append(value)
+ value_index += 1
+ target_flag_set_arr = None
+ value_count: int = 0
+ for flag_set in flag_value_map:
+ flag_set_arr = flag_set.split('|')
+ if flag in flag_set_arr:
+ target_flag_set_arr = flag_set_arr
+ value_count = flag_value_map[flag_set]
+ break
+ if target_flag_set_arr is None:
+ err_arg_unrecognized(flag)
+ return fail(ERR_UNRECOGNIZED_ARG)
+ while value_index < value_count:
+ if i >= len(argv):
+ print_error(f"Not enough arguments for flag '{flag}'")
+ return fail(ERR_NOT_ENOUGH_ARGS)
+ values.append(argv[i])
+ i += 1
+ value_index += 1
+
+ # NOTE: we always return the first flag in the flag-set for normalization
+ return success(target_flag_set_arr[0], values, i)
+
+def parse_generic(
+ flag_value_map: dict[str, int],
+ argv: list[str],
+ i: int
+) -> dict[str, list[str]] | None:
+ res: dict[str, list[str]] = {}
+ while i < len(argv):
+ arg = argv[i]
+ if len(arg) >= 2:
+ flag, values, i, err = parse_arg(argv, i, flag_value_map)
+ if err is not None:
+ return None
+
+ res[flag] = values
+ #if not callback(flag, values):
+ # return False
+ else:
+ err_arg_unrecognized(arg)
+ return None
+
+ return res
+
diff --git a/src/xpit_/create.py b/src/xpit_/create.py
new file mode 100644
index 0000000..30d32fa
--- /dev/null
+++ b/src/xpit_/create.py
@@ -0,0 +1,263 @@
+import os
+import subprocess
+import getpass
+from sys import argv, stdout, stderr
+from datetime import datetime, timezone
+from . import args_helpers, identity, issue, issue_tracker
+
+help_text = """\
+Usage: xpit create [options]
+Examples:
+ xpit create
+ xpit create -u Bob -i secret_bob_alias -r 20260101_120000:20260101_140000 -x
+ xpit create -e_ -t "Startup slow" -l performance,startup -d "The window takes 200ms to open."
+Options:
+ -d, --description <str>
+ The post body.
+ Defaults to 'Description goes here...'
+ -x, --disable-inline-annotations
+ Disable the explanatory ; annotations in newly created posts' headers.
+ -e, --editor <str>
+ Which editor to open the issue in.
+ Pass '_' (underscore) to not open an editor.
+ If not specified, the editor is chosen in the following order:
+ 1. $XPIT_EDITOR
+ 2. $GIT_EDITOR
+ 3. $EDITOR
+ 4. $VISUAL
+ 5.
+ windows: "notepad"
+ other: "vi"
+ -i, --identity <str>
+ Label for keypair used for unique identification and signature.
+ This is used to uniquely identify a user.
+ Keys are stored in '~/.config/xpit/identities/'.
+ Defaults to 'default'.
+ -l, --labels <str>
+ Comma-separated list of labels applicable to this issue.
+ Not compatible with --reply-to.
+ Empty by default.
+ -r, --reply-to <ISSUE_ID>[:LOCAL_ID]
+ Reply to an issue or a specific reply within an issue.
+ -P, --skip-prompt
+ Skip interactive prompts for missing fields (title, description, labels).
+ Only applies when using `-e_` (no automatic editor).
+ -t, --title <str>
+ The issue's title.
+ Not compatible with --reply-to.
+ Defaults to 'Title'.
+ -u, --username <str>
+ The username to use for the issue.
+ If not specified, uses `getpass.getuser()`.
+ -h, --help
+ Print this help text.
+"""
+
+class Args:
+ def __init__(self):
+ self.config_dir: str
+ self.editor: str
+ self.username: str
+ self.identity: str = "default"
+ self.reply_to: str | None = None
+ self.disable_inline_annotations: bool = False
+
+ self.labels: bytes = b""
+ self.skip_prompt: bool = False
+ self.description: bytes = b"Description goes here..."
+ self.title: bytes = b"Title"
+
+ def parse(self) -> bool:
+ flag_value_map = {
+ "help|h": 0,
+ "description|d": 1,
+ "disable-inline-annotations|x": 0,
+ "editor|e": 1,
+ "identity|i": 1,
+ "labels|l": 1,
+ "reply|r": 1,
+ "skip-prompt|P": 0,
+ "title|t|": 1,
+ "username|u": 1,
+ }
+ res = args_helpers.parse_generic(flag_value_map, argv, 2)
+ _editor: str | None = None
+ _username: str | None = getpass.getuser()
+ if res is None:
+ return False
+ if "help" in res:
+ stdout.write(help_text)
+ return False
+ if "description" in res:
+ self.description = res["description"][0].encode("utf-8")
+ if "disable-inline-annotations" in res:
+ self.disable_inline_annotations = True
+ if "editor" in res:
+ _editor = res["editor"][0]
+ if "identity" in res:
+ self.identity = res["identity"][0]
+ if "labels" in res:
+ labels: str = res["labels"][0]
+ issue_tracker.load_config()
+ for label in labels.split(','):
+ if label.encode("utf-8") not in issue_tracker.config[b"allowed_labels"]:
+ stderr.write(
+ f"Label '{label}' not allowed.\nAllowed: "
+ +issue_tracker.config[b"allowed_labels"].decode("utf-8")
+ +'\n')
+ return False
+ self.labels = labels.encode("utf-8")
+ if "reply" in res:
+ self.reply_to = res["reply"][0]
+ if "skip-prompt" in res:
+ self.skip_prompt = True
+ if "title" in res:
+ self.title = res["title"][0].encode("utf-8")
+ if "username" in res:
+ _username = res["username"][0]
+
+ if _username is None:
+ stderr.write("No username.")
+ return False
+ self.username = _username
+
+ if _editor is None:
+ _editor = (
+ os.environ.get("XPIT_EDITOR")
+ or os.environ.get("GIT_EDITOR")
+ or os.environ.get("EDITOR")
+ or os.environ.get("VISUAL")
+ or None
+ )
+ if _editor is None:
+ if os.name == "nt":
+ _editor = "notepad"
+ else:
+ _editor = "vi"
+ self.editor = _editor
+
+ if os.name == "nt":
+ appdata = os.getenv("APPDATA")
+ if appdata is None:
+ stderr.write("Couldn't find $APPDATA directory")
+ return False
+ self.config_dir = os.path.join(appdata, "xpit")
+ else:
+ home = os.getenv("HOME")
+ if home is None:
+ stderr.write("Couldn't find $HOME directory")
+ return False
+ self.config_dir = os.path.join(home, ".config", "xpit")
+
+ return True
+args = Args()
+
+def id_gen() -> str:
+ return datetime.now(timezone.utc).strftime(issue_tracker.config[b"id_format"].decode("utf-8"))
+
+def main() -> int:
+ if len(argv) < 2:
+ stdout.write(help_text)
+ return 1
+
+ if not args.parse():
+ return 1
+
+ identities_dir = os.path.join(args.config_dir, "identities")
+ private_key, public_key = identity.load(identities_dir, args.identity)
+ _ = private_key
+
+ issue_tracker.load_config()
+ if issue_tracker.dir is None:
+ stderr.write("xpit not initialized. Run `xpit init`.\n")
+ return 1
+
+ if args.reply_to is None:
+ issue_id = id_gen()
+ else:
+ issue_id = args.reply_to.split(":", 1)[0]
+ issue_dir = os.path.join(issue_tracker.dir, issue_id)
+ if args.reply_to is None:
+ if os.path.exists(issue_dir):
+ stderr.write(f"Failed to create issue {issue_id}: Already exists.\n")
+ return 1
+ os.mkdir(issue_dir)
+
+ issue_tracker_path = os.path.join(issue_dir, "issue.md")
+ if args.reply_to is not None:
+ if not os.path.isfile(issue_tracker_path):
+ stderr.write(f"Issue {issue_id} not found.\n")
+ return 1
+ with open(issue_tracker_path, "r+b") as f:
+ src = f.read()
+ info, err = issue.parse(src)
+ if err is not None:
+ stderr.write(err.msg)
+ return 1
+ if info.meta[b"status"] != b"open":
+ stderr.write(f"Issue {issue_id} already closed.\n")
+ return 1
+
+ identity_id = identity.id_from_pubkey(public_key)
+ # help_author = b" ; your user-ID"
+ # help_status = b" ; open | closed"
+ allowed_labels = issue_tracker.config[b"allowed_labels"]
+ help_labels = b" ; comma-separated list. Allowed labels are: "+allowed_labels
+ # help_id = b" ; this reply's local ID"
+ # help_reply = b" ; local ID of the post you're replying to"
+ if args.disable_inline_annotations:
+ help_labels = b""
+ if args.editor == "_" and not args.skip_prompt:
+ if args.title is None:
+ args.title = input("Title: ").encode("utf-8")
+ if args.labels is None:
+ args.labels = input("Labels (Comma-separated): ").encode("utf-8")
+ if args.description is None:
+ args.description = input("Description: ").encode("utf-8")
+ with open(issue_tracker_path, "ab") as f:
+ ini_author = b"author=" + args.username.encode("utf-8") + b'#' + identity_id + b'\n'
+ if args.reply_to is None:
+ ini_status = b"status=open\n"
+ ini_labels = b"labels="+args.labels + help_labels + b'\n'
+ ini_add_after_edit = ini_author + ini_status
+ ini_add_now = ini_labels
+ if args.editor == "_":
+ ini_add_now = ini_add_after_edit + ini_add_now
+ msg = b"```ini\n"+ini_add_now+b"""```
+
+# """+args.title+b"""
+
+"""+args.description+b"""
+
+"""
+ else:
+ ini_id = b"id="+id_gen().encode("utf-8")+b'\n'
+ reply_to = args.reply_to.split(":", 2)
+ ini_reply = b""
+ if len(reply_to) == 2:
+ ini_reply = b"reply="+reply_to[1].encode("utf-8")+b'\n'
+ ini_add_after_edit = ini_author + ini_id
+ ini_add_now = ini_reply
+ if args.editor == "_":
+ ini_add_now = ini_add_after_edit + ini_add_now
+ msg = b"---\n```ini\n"+ini_add_now+b"```\n\n"+args.description+b"\n\n"
+ f.write(msg)
+
+ if args.editor != "_":
+ subprocess.call([args.editor, issue_tracker_path])
+ with open(issue_tracker_path, "r+b") as f:
+ src = f.read()
+ src = issue.clean_after_edit(src, ini_add_after_edit)
+ f.seek(0)
+ f.write(src)
+ f.truncate()
+ # TODO check if labels ok. Maybe we want to have some sort of `check` command
+
+ relpath = os.path.relpath(issue_tracker_path, os.getcwd())
+ if args.reply_to is None:
+ stdout.write(f"Created new issue ./{relpath}\n")
+ else:
+ stdout.write(f"Replied to {args.reply_to}\n")
+
+ return 0
+
diff --git a/src/xpit_/error.py b/src/xpit_/error.py
new file mode 100644
index 0000000..12cf084
--- /dev/null
+++ b/src/xpit_/error.py
@@ -0,0 +1,4 @@
+class Err:
+ def __init__(self, msg: str):
+ self.msg = msg
+
diff --git a/src/xpit_/identity.py b/src/xpit_/identity.py
new file mode 100644
index 0000000..2786b2d
--- /dev/null
+++ b/src/xpit_/identity.py
@@ -0,0 +1,59 @@
+import os
+import base64
+from sys import stdout
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import ed25519
+from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes, PublicKeyTypes
+
+# TODO guard against bad file access
+def create(path: str) -> None:
+ private_key = ed25519.Ed25519PrivateKey.generate()
+ public_key = private_key.public_key()
+ enc = serialization.NoEncryption()
+
+ os.mkdir(path)
+
+ data = private_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.PKCS8,
+ encryption_algorithm=enc,
+ )
+ with open(os.path.join(path, "private.pem"), "wb") as f:
+ f.write(data)
+
+ data = public_key.public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ )
+ with open(os.path.join(path, "public.pub"), "wb") as f:
+ f.write(data)
+
+def load(identities_dir: str, identity: str) -> tuple[PrivateKeyTypes, PublicKeyTypes]:
+ if not os.path.isdir(identities_dir):
+ os.makedirs(identities_dir)
+
+ path = os.path.join(identities_dir, identity)
+
+ if not os.path.isdir(path):
+ stdout.write(f"Generating new identity '{identity}'...\n")
+ create(path)
+ stdout.write(f"Created new identity '{identity}' at: {path}\n")
+
+ with open(os.path.join(path, "private.pem"), "rb") as f:
+ private_key_data = f.read()
+ with open(os.path.join(path, "public.pub"), "rb") as f:
+ public_key_data = f.read()
+
+ return (
+ serialization.load_pem_private_key(private_key_data, password=None),
+ serialization.load_pem_public_key(public_key_data)
+ )
+
+def id_from_pubkey(pubkey: PublicKeyTypes) -> bytes:
+ der = pubkey.public_bytes(
+ encoding=serialization.Encoding.DER,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo,
+ )
+ b64 = base64.b64encode(der)
+ return b64[16:24]
+
diff --git a/src/xpit_/init.py b/src/xpit_/init.py
new file mode 100644
index 0000000..3663f4f
--- /dev/null
+++ b/src/xpit_/init.py
@@ -0,0 +1,54 @@
+import os
+from sys import argv, stdout, stderr
+from . import args_helpers, issue_tracker
+
+help_text = """\
+Usage: xpit init
+Options:
+ -h, --help
+ Print this help text.
+"""
+
+class Args:
+ def __init__(self):
+ pass
+
+ def parse(self) -> bool:
+ flag_value_map = {
+ "help|h": 0,
+ }
+ res = args_helpers.parse_generic(flag_value_map, argv, 2)
+ if res is None:
+ return False
+ if "help" in res:
+ stdout.write(help_text)
+ return False
+
+ return True
+args = Args()
+
+def main() -> int:
+ if len(argv) < 2:
+ stdout.write(help_text)
+ return 1
+
+ if not args.parse():
+ return 1
+
+ if issue_tracker.find_dir(os.getcwd(), suppress_warning=True) is not None:
+ stderr.write("xpit already initialized\n")
+ return 1
+ os.mkdir("issue-tracker")
+ config_file_path = os.path.join("issue-tracker", "_config.ini")
+ with open(config_file_path, "wb") as f:
+ f.write(b"""\
+id_format=%Y%m%d_%H%M%S ; `man strftime` for format
+allowed_labels=bug,breaking,contributor-friendly,docs,use-case,regression,proposal ; labels that are allowed to be used in issues. (Comma-separated list)
+\n""")
+ stdout.write(f"""\
+Successfully initialized at ./issue-tracker.
+Configure in {config_file_path}
+""")
+
+ return 0
+
diff --git a/src/xpit_/issue.py b/src/xpit_/issue.py
new file mode 100644
index 0000000..53efeef
--- /dev/null
+++ b/src/xpit_/issue.py
@@ -0,0 +1,168 @@
+from __future__ import annotations
+from . import error
+Err = error.Err
+
+class Post:
+ def __init__(self):
+ self.meta: dict[bytes, bytes] = {}
+ self.title: bytes = b""
+ self.replies: list[Post] = []
+
+ def reply_get(self, needle: bytes) -> Post | None:
+ for reply in self.replies:
+ if b"id" not in reply.meta:
+ continue
+ if reply.meta[b"id"] == needle:
+ return reply
+ res = reply.reply_get(needle)
+ if res is not None:
+ return res
+ return None
+
+ def post_count(self) -> int:
+ count = 1
+ for reply in self.replies:
+ count += reply.post_count()
+ return count
+
+def is_newline(c: int) -> bool:
+ return (
+ c == ord('\n')
+ or c == ord('\r')
+ )
+
+def is_whitespace(c: int) -> bool:
+ return (
+ c == ord(' ')
+ or c == ord('\t')
+ or is_newline(c)
+ )
+
+def parse_ini(src: bytes | memoryview, i: int, res: dict[bytes, bytes]) -> int:
+ src = memoryview(src)
+
+ STATE_START = 0
+ STATE_KEY = 1
+ STATE_VALUE = 2
+ state = STATE_START
+
+ key = bytearray()
+ value = bytearray()
+
+ while i < len(src):
+ if state == STATE_START:
+ if src[i:i+4] == b"```\n":
+ return i + 4
+ elif src[i] == ord(' '):
+ pass
+ else:
+ state = STATE_KEY
+ continue
+ elif state == STATE_KEY:
+ if src[i] == ord('='):
+ state = STATE_VALUE
+ else:
+ key.append(src[i])
+ elif state == STATE_VALUE:
+ if is_newline(src[i]) or src[i] == ord(';'):
+ if src[i] == ord(';'):
+ while len(value) > 0 and is_whitespace(value[-1]):
+ value.pop()
+ while i < len(src) and not is_newline(src[i]):
+ i += 1
+ res[bytes(key)] = bytes(value)
+ state = STATE_START
+ key.clear()
+ value.clear()
+ else:
+ value.append(src[i])
+ else:
+ assert False, "unreachable: state enum not exhausted"
+
+ i += 1
+
+ return -1
+
+def parse(src: bytes | memoryview) -> tuple[Post, Err | None]:
+ def success(post: Post) -> tuple[Post, Err | None]:
+ return post, None
+ def fail(msg: str) -> tuple[Post, Err | None]:
+ return Post(), Err(msg)
+
+ src = memoryview(src)
+ issue = Post()
+ issue_title: bytes | None = None
+ i = 0
+
+ post = issue
+ while i < len(src):
+ if src[i:i+4] == b"\n---":
+ i += 4
+ post = Post()
+ continue
+ if src[i:i+7] == b"```ini\n":
+ i += 7
+ i = parse_ini(src, i, post.meta)
+ if i == -1:
+ return fail("ERR_FAILED_TO_PARSE_INI")
+ if b"reply" in post.meta:
+ reply = issue.reply_get(post.meta[b"reply"])
+ if reply is None:
+ return fail("ERR_REPLY_TO_NONEXISTENT_ID")
+ reply.replies.append(post)
+ elif post != issue:
+ issue.replies.append(post)
+ continue
+ if issue_title is None and src[i:i+2] == b"# ":
+ i += 2
+ title_start = i
+ while i < len(src):
+ if is_newline(src[i]):
+ break
+ i += 1
+ issue_title = bytes(src[title_start:i])
+ continue
+
+ i += 1
+
+ if issue_title is None:
+ return fail("ERR_ISSUE_NO_TITLE")
+ issue.title = issue_title
+
+ return success(issue)
+
+def clean_after_edit(src: bytes | memoryview, last_ini_prefix: bytes) -> bytes:
+ src = memoryview(src)
+ last_ini_prefix = bytes(last_ini_prefix)
+ res: list[int] = []
+ i = 0
+ last_ini_start = 0
+ while i < len(src):
+ while i < len(src):
+ if src[i:i+7] == b"```ini\n":
+ res.extend(src[i:i+7])
+ last_ini_start = len(res)
+ i += 7
+ break
+ res.append(src[i])
+ i += 1
+
+ while i < len(src):
+ # inside ini
+ if src[i:i+4] == b"\n```":
+ res.extend(src[i:i+4])
+ i += 4
+ break
+ elif src[i:i+3] == b" ; ":
+ i += 3
+ while i < len(src):
+ if is_newline(src[i]):
+ break
+ i += 1
+ res.append(src[i])
+ i += 1
+
+ res[last_ini_start:last_ini_start] = last_ini_prefix
+
+ return bytes(res)
+
diff --git a/src/xpit_/issue_tracker.py b/src/xpit_/issue_tracker.py
new file mode 100644
index 0000000..20c1662
--- /dev/null
+++ b/src/xpit_/issue_tracker.py
@@ -0,0 +1,52 @@
+import os
+import sys
+from . import issue
+from . import error
+Err = error.Err
+
+class State:
+ def __init__(self):
+ self.dir: str | None = None
+ self.config: dict[bytes, bytes] | None = None
+ self.allowed_labels: list[bytes] | None = None
+state = State()
+
+def find_dir(start_dir: str, suppress_warning: bool = False) -> str | None:
+ cur = os.path.abspath(start_dir)
+
+ while True:
+ candidate = os.path.join(cur, "issue-tracker")
+ if os.path.isdir(candidate):
+ return candidate
+
+ parent = os.path.dirname(cur)
+ if parent == cur: # reached filesystem root
+ if not suppress_warning:
+ sys.stderr.write(
+ "xpit has not been initialized. `xpit init` to initialize.\n"
+ )
+ return None
+ cur = parent
+
+def load_config() -> Err | None:
+ if state.dir is None:
+ state.dir = find_dir(os.getcwd())
+ if state.dir is None:
+ return Err("ERR_COULD_NOT_FIND_ISSUE_TRACKER_DIR")
+ with open(os.path.join(state.dir, "_config.ini"), "rb") as f:
+ src = f.read()
+ state.config = {}
+ issue.parse_ini(src, 0, state.config)
+ if b"allowed_labels" not in state.config:
+ sys.stdout.write("Warning: _config.ini has no allowed_labels\n")
+ state.config[b"allowed_labels"] = b""
+ if b"id_format" not in state.config:
+ sys.stdout.write("Warning: _config.ini has no id_format. Defaulting to '%Y%m%d_%H%M%S'\n")
+ state.config[b"id_format"] = b"%Y%m%d_%H%M%S"
+ state.allowed_labels = state.config[b"allowed_labels"].split(b',')
+
+ return None
+
+def __getattr__(name):
+ return getattr(state, name)
+
diff --git a/src/xpit_/list_.py b/src/xpit_/list_.py
new file mode 100644
index 0000000..fe281c2
--- /dev/null
+++ b/src/xpit_/list_.py
@@ -0,0 +1,104 @@
+import os
+from sys import argv, stdout, stderr
+from . import args_helpers, issue, issue_tracker
+
+DEFAULT_ORDER: list[bytes] = [b"ID", b"POST_COUNT", b"TITLE", b"status", b"labels"]
+
+help_text = f"""\
+Usage: xpit list [options]
+Examples:
+ xpit list -d , -f TITLE,POST_COUNT
+Options:
+ -d, --delimiter <str>
+ Delimiter used to separate the fields.
+ Defaults to '|' (pipe).
+ -f, --fields <FIELD,...>
+ Defaults to '{b','.join(DEFAULT_ORDER).decode("utf-8")}'.
+ Guaranteed fields:
+ ID - ID of the issue
+ TITLE - Title of the issue
+ AUTHOR - Author of the issue
+ POST_COUNT - Total number of posts in the issue, including the initial post
+ PATH_ABS - absolute path to issue.md file
+ PATH_REL - relative path to issue.md file
+ Other fields are optional and parsed in issues' INI headers.
+ The fields are printed in order and the same field can be printed more than once.
+ -h, --help
+ Print this help text.
+"""
+
+class Args:
+ def __init__(self) -> None:
+ self.order = DEFAULT_ORDER
+ self.delimiter = '|'
+
+ def parse(self) -> bool:
+ flag_value_map = {
+ "help|h": 0,
+ "delimiter|d": 1,
+ "fields|f": 1,
+ }
+ res = args_helpers.parse_generic(flag_value_map, argv, 2)
+ if res is None:
+ return False
+ if "help" in res:
+ stdout.write(help_text)
+ return False
+ if "delimiter" in res:
+ self.delimiter = res["delimiter"][0]
+ if "fields" in res:
+ self.order = [field.encode("utf-8") for field in res["fields"][0].split(',')]
+
+ return True
+args = Args()
+
+def main() -> int:
+ if len(argv) < 2:
+ stdout.write(help_text)
+ return 1
+
+ if not args.parse():
+ return 1
+
+ cwd = os.getcwd()
+ issue_tracker_dir = issue_tracker.find_dir(cwd)
+ if issue_tracker_dir is None:
+ return 1
+ for file_name in os.listdir(issue_tracker_dir):
+ if not os.path.isdir(os.path.join(issue_tracker_dir, file_name)):
+ continue
+
+ file_path = os.path.join(issue_tracker_dir, file_name, "issue.md")
+ with open(file_path, "rb") as f:
+ src = f.read()
+ info, err = issue.parse(src)
+ if err is not None:
+ stderr.write(err.msg)
+ return 1
+ i = 0
+ while True:
+ key = args.order[i]
+ if key == b"ID":
+ stdout.write(file_name)
+ elif key == b"TITLE":
+ stdout.write(info.title.decode("utf-8"))
+ elif key == b"POST_COUNT":
+ stdout.write(str(info.post_count()))
+ elif key == b"AUTHOR":
+ stdout.write(info.meta[b"author"].decode("utf-8"))
+ elif key == b"PATH_ABS":
+ stdout.write(file_path)
+ elif key == b"PATH_REL":
+ stdout.write(os.path.relpath(file_path, start=cwd))
+ else:
+ if key in info.meta:
+ stdout.write(info.meta[key].decode("utf-8"))
+
+ i += 1
+ if i >= len(args.order):
+ break
+ stdout.write(args.delimiter)
+ stdout.write('\n')
+
+ return 0
+
diff --git a/src/xpit_/xpit.py b/src/xpit_/xpit.py
new file mode 100644
index 0000000..f6eefd0
--- /dev/null
+++ b/src/xpit_/xpit.py
@@ -0,0 +1,53 @@
+from sys import argv, stdout
+from . import args_helpers, init, create, list_
+
+help_text = """\
+xpit - cross-platform issue tracker
+Usage: xpit <COMMAND> [options]
+<COMMAND>:
+ init:
+ Initialize issue tracker in current directory.
+ create:
+ Create new issue or reply.
+ list:
+ List issues.
+Options:
+ -h, --help
+ Print this help text. `xpit <COMMAND> -h` for help with specific command.
+"""
+
+class Args:
+ def __init__(self) -> None:
+ pass
+
+ def parse(self) -> bool:
+ flag_value_map = {
+ "help|h": 0,
+ }
+ res = args_helpers.parse_generic(flag_value_map, argv, 1)
+ if res is None:
+ return False
+ if "help" in res:
+ stdout.write(help_text)
+ return False
+
+ return True
+args = Args()
+
+def main() -> int:
+ if len(argv) <= 1:
+ stdout.write(help_text)
+ return 1
+
+ elif argv[1] == "init":
+ return init.main()
+ elif argv[1] == "create":
+ return create.main()
+ elif argv[1] == "list":
+ return list_.main()
+ else:
+ if not args.parse():
+ return 1
+
+ return 0
+