from __future__ import annotations from . import error Err = error.Err class Post: def __init__(self): self.meta: dict[bytes, bytes] = {} self.title: bytes = b"" self.replies: list[Post] = [] def reply_get(self, needle: bytes) -> Post | None: for reply in self.replies: if b"id" not in reply.meta: continue if reply.meta[b"id"] == needle: return reply res = reply.reply_get(needle) if res is not None: return res return None def post_count(self) -> int: count = 1 for reply in self.replies: count += reply.post_count() return count def is_newline(c: int) -> bool: return ( c == ord('\n') or c == ord('\r') ) def is_whitespace(c: int) -> bool: return ( c == ord(' ') or c == ord('\t') or is_newline(c) ) def parse_ini(src: bytes | memoryview, i: int, res: dict[bytes, bytes]) -> int: src = memoryview(src) STATE_START = 0 STATE_KEY = 1 STATE_VALUE = 2 state = STATE_START key = bytearray() value = bytearray() while i < len(src): if state == STATE_START: if src[i:i+4] == b"```\n": return i + 4 elif src[i] == ord(' '): pass else: state = STATE_KEY continue elif state == STATE_KEY: if src[i] == ord('='): state = STATE_VALUE else: key.append(src[i]) elif state == STATE_VALUE: if is_newline(src[i]) or src[i] == ord(';'): if src[i] == ord(';'): while len(value) > 0 and is_whitespace(value[-1]): value.pop() while i < len(src) and not is_newline(src[i]): i += 1 res[bytes(key)] = bytes(value) state = STATE_START key.clear() value.clear() else: value.append(src[i]) else: assert False, "unreachable: state enum not exhausted" i += 1 return -1 def parse(src: bytes | memoryview) -> tuple[Post, Err | None]: def success(post: Post) -> tuple[Post, Err | None]: return post, None def fail(msg: str) -> tuple[Post, Err | None]: return Post(), Err(msg) src = memoryview(src) issue = Post() issue_title: bytes | None = None i = 0 post = issue while i < len(src): if src[i:i+4] == b"\n---": i += 4 post = Post() continue if src[i:i+7] == b"```ini\n": i += 7 i = parse_ini(src, i, post.meta) if i == -1: return fail("ERR_FAILED_TO_PARSE_INI") if b"reply" in post.meta: reply = issue.reply_get(post.meta[b"reply"]) if reply is None: return fail("ERR_REPLY_TO_NONEXISTENT_ID") reply.replies.append(post) elif post != issue: issue.replies.append(post) continue if issue_title is None and src[i:i+2] == b"# ": i += 2 title_start = i while i < len(src): if is_newline(src[i]): break i += 1 issue_title = bytes(src[title_start:i]) continue i += 1 if issue_title is None: return fail("ERR_ISSUE_NO_TITLE") issue.title = issue_title return success(issue) def clean_after_edit(src: bytes | memoryview, last_ini_prefix: bytes) -> bytes: src = memoryview(src) last_ini_prefix = bytes(last_ini_prefix) res: list[int] = [] i = 0 last_ini_start = 0 while i < len(src): while i < len(src): if src[i:i+7] == b"```ini\n": res.extend(src[i:i+7]) last_ini_start = len(res) i += 7 break res.append(src[i]) i += 1 while i < len(src): # inside ini if src[i:i+4] == b"\n```": res.extend(src[i:i+4]) i += 4 break elif src[i:i+3] == b" ; ": i += 3 while i < len(src): if is_newline(src[i]): break i += 1 res.append(src[i]) i += 1 res[last_ini_start:last_ini_start] = last_ini_prefix return bytes(res)