summaryrefslogtreecommitdiff
path: root/src/xpit_/issue.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/xpit_/issue.py')
-rw-r--r--src/xpit_/issue.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/src/xpit_/issue.py b/src/xpit_/issue.py
new file mode 100644
index 0000000..53efeef
--- /dev/null
+++ b/src/xpit_/issue.py
@@ -0,0 +1,168 @@
+from __future__ import annotations
+from . import error
+Err = error.Err
+
+class Post:
+ def __init__(self):
+ self.meta: dict[bytes, bytes] = {}
+ self.title: bytes = b""
+ self.replies: list[Post] = []
+
+ def reply_get(self, needle: bytes) -> Post | None:
+ for reply in self.replies:
+ if b"id" not in reply.meta:
+ continue
+ if reply.meta[b"id"] == needle:
+ return reply
+ res = reply.reply_get(needle)
+ if res is not None:
+ return res
+ return None
+
+ def post_count(self) -> int:
+ count = 1
+ for reply in self.replies:
+ count += reply.post_count()
+ return count
+
+def is_newline(c: int) -> bool:
+ return (
+ c == ord('\n')
+ or c == ord('\r')
+ )
+
+def is_whitespace(c: int) -> bool:
+ return (
+ c == ord(' ')
+ or c == ord('\t')
+ or is_newline(c)
+ )
+
+def parse_ini(src: bytes | memoryview, i: int, res: dict[bytes, bytes]) -> int:
+ src = memoryview(src)
+
+ STATE_START = 0
+ STATE_KEY = 1
+ STATE_VALUE = 2
+ state = STATE_START
+
+ key = bytearray()
+ value = bytearray()
+
+ while i < len(src):
+ if state == STATE_START:
+ if src[i:i+4] == b"```\n":
+ return i + 4
+ elif src[i] == ord(' '):
+ pass
+ else:
+ state = STATE_KEY
+ continue
+ elif state == STATE_KEY:
+ if src[i] == ord('='):
+ state = STATE_VALUE
+ else:
+ key.append(src[i])
+ elif state == STATE_VALUE:
+ if is_newline(src[i]) or src[i] == ord(';'):
+ if src[i] == ord(';'):
+ while len(value) > 0 and is_whitespace(value[-1]):
+ value.pop()
+ while i < len(src) and not is_newline(src[i]):
+ i += 1
+ res[bytes(key)] = bytes(value)
+ state = STATE_START
+ key.clear()
+ value.clear()
+ else:
+ value.append(src[i])
+ else:
+ assert False, "unreachable: state enum not exhausted"
+
+ i += 1
+
+ return -1
+
+def parse(src: bytes | memoryview) -> tuple[Post, Err | None]:
+ def success(post: Post) -> tuple[Post, Err | None]:
+ return post, None
+ def fail(msg: str) -> tuple[Post, Err | None]:
+ return Post(), Err(msg)
+
+ src = memoryview(src)
+ issue = Post()
+ issue_title: bytes | None = None
+ i = 0
+
+ post = issue
+ while i < len(src):
+ if src[i:i+4] == b"\n---":
+ i += 4
+ post = Post()
+ continue
+ if src[i:i+7] == b"```ini\n":
+ i += 7
+ i = parse_ini(src, i, post.meta)
+ if i == -1:
+ return fail("ERR_FAILED_TO_PARSE_INI")
+ if b"reply" in post.meta:
+ reply = issue.reply_get(post.meta[b"reply"])
+ if reply is None:
+ return fail("ERR_REPLY_TO_NONEXISTENT_ID")
+ reply.replies.append(post)
+ elif post != issue:
+ issue.replies.append(post)
+ continue
+ if issue_title is None and src[i:i+2] == b"# ":
+ i += 2
+ title_start = i
+ while i < len(src):
+ if is_newline(src[i]):
+ break
+ i += 1
+ issue_title = bytes(src[title_start:i])
+ continue
+
+ i += 1
+
+ if issue_title is None:
+ return fail("ERR_ISSUE_NO_TITLE")
+ issue.title = issue_title
+
+ return success(issue)
+
+def clean_after_edit(src: bytes | memoryview, last_ini_prefix: bytes) -> bytes:
+ src = memoryview(src)
+ last_ini_prefix = bytes(last_ini_prefix)
+ res: list[int] = []
+ i = 0
+ last_ini_start = 0
+ while i < len(src):
+ while i < len(src):
+ if src[i:i+7] == b"```ini\n":
+ res.extend(src[i:i+7])
+ last_ini_start = len(res)
+ i += 7
+ break
+ res.append(src[i])
+ i += 1
+
+ while i < len(src):
+ # inside ini
+ if src[i:i+4] == b"\n```":
+ res.extend(src[i:i+4])
+ i += 4
+ break
+ elif src[i:i+3] == b" ; ":
+ i += 3
+ while i < len(src):
+ if is_newline(src[i]):
+ break
+ i += 1
+ res.append(src[i])
+ i += 1
+
+ res[last_ini_start:last_ini_start] = last_ini_prefix
+
+ return bytes(res)
+