diff options
author | David Timber <dxdt@dev.snart.me> | 2024-05-01 00:37:21 +0900 |
---|---|---|
committer | David Timber <dxdt@dev.snart.me> | 2024-05-01 00:37:21 +0900 |
commit | 0881bc4df7ad5225da10e0028fdf73e95c8fa943 (patch) | |
tree | 0fcbc134c87e6a4579d29272634f0af83699afd4 |
Initial commit
-rw-r--r-- | .gitignore | 4 | ||||
-rw-r--r-- | .vscode/launch.json | 17 | ||||
-rw-r--r-- | doc/config.jsonc | 13 | ||||
-rw-r--r-- | pyproject.toml | 24 | ||||
-rw-r--r-- | src/okkybot/__init__.py | 70 | ||||
-rw-r--r-- | src/okkybot/__main__.py | 277 |
6 files changed, 405 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..feafa3a --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__ + +/src/cache.json +/src/config.jsonc diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..fbd3346 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,17 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "okkybot", + "type": "debugpy", + "request": "launch", + "cwd": "${workspaceFolder}/src", + "module": "okkybot", + "args": [], + "justMyCode": true + }, + ] +} diff --git a/doc/config.jsonc b/doc/config.jsonc new file mode 100644 index 0000000..f185b17 --- /dev/null +++ b/doc/config.jsonc @@ -0,0 +1,13 @@ +{ + "login": { + "id": "", + "password": "" + }, + "api_keys": { + "chatgpt": "" + }, + "marker": { + "secret": "", + "algorithm": "HS256" + } +} diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..380feb5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "okkybot" +version = "0.0.0" +authors = [ + { name="David Timber", email="dxdt@dev.snart.me" }, +] +description = "okkybot" +readme = "README.md" +requires-python = ">= 3.10" +dependencies = [ "pyjson5", "openai", "pyjwt[crypto]" ] + +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", +] + +[project.urls] +Homepage = "https://github.com/kenu/funjava/" +Issues = "https://github.com/kenu/funjava/issues" diff --git a/src/okkybot/__init__.py b/src/okkybot/__init__.py new file mode 100644 index 0000000..e827599 --- /dev/null +++ b/src/okkybot/__init__.py @@ -0,0 +1,70 @@ +from enum import Enum +from html.parser import HTMLParser +from typing import Any +from urllib.parse import parse_qs, urlparse +import requests.cookies +import requests.sessions + +class StateCache: + class TopicState: + def __init__ (self, saved = dict[str, Any]()): + self.last_post = saved.get("last_post", 0) + + def get_dict (self) -> dict[str, Any]: + return { + "last_post": self.last_post + } + + def parseCookies (doc: dict) -> requests.cookies.RequestsCookieJar: + pass + + def __init__ (self, saved = dict[str, Any]): + self.topics = dict[str, dict[str, Any]]() + self.cookies = requests.sessions.cookiejar_from_dict( + saved.get("cookies", dict())) + + for k, v in saved.get("topics", dict()).items(): + self.topics[k] = StateCache.TopicState(v) + + def get_dict (self) -> dict[str, Any]: + ret = dict[str, Any]() + + if self.topics: + d = ret["topics"] = dict[str, Any]() + for k, v in self.topics.items(): + d[k] = v.get_dict() + + ret["cookies"] = self.cookies.get_dict() + + return ret + +class InnerHTMLExtractor (HTMLParser): + def __init__ (self): + super().__init__() + self.data = list[str]() + + def handle_data(self, data: str): + self.data.append(data) + +class JWTMarkerExtractor (HTMLParser): + def __init__ (self): + super().__init__() + self.marker = list[str]() + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]): + if tag.lower() != "a": + return + for kv in attrs: + if kv[0] == "href": + u = urlparse(kv[1]) + if u.hostname != "" or u.path != "": continue + + qs = parse_qs(u.query) + self.marker.extend(qs.get("okkybot-marker", [])) + + +class ProcPostResult (Enum): + UNVIABLE = "Unviable" + YES = "yes" + NO = "no" + MARKER = "marker" diff --git a/src/okkybot/__main__.py b/src/okkybot/__main__.py new file mode 100644 index 0000000..adac359 --- /dev/null +++ b/src/okkybot/__main__.py @@ -0,0 +1,277 @@ +import html +import os +import sys +from typing import * +import uuid +import requests +import pyjson5 +import openai +import jwt +import urllib.parse + +from okkybot import InnerHTMLExtractor, JWTMarkerExtractor, ProcPostResult, StateCache + +VERSION = "0" +USER_AGENT = "Okkybot/{v} (+https://github.com/dxdxdt/okkybot)".format( + v = VERSION) + +API_ENDPOINT = "https://okky.kr/api/okky-web" +CACHE_FILENAME = "cache.json" +CONFIG_FILENAME = "config.jsonc" +MAX_POSTS_PER_TOPIC = 50 +TARGET_TOPICS = [ "community" ] +POST_TOKEN_LIMIT = 5000 # $0.0025 spending limit per post + +dryrun = True + +def getCache () -> StateCache: + try: + with open(CACHE_FILENAME) as f: + doc = pyjson5.loads(f.read()) + except FileNotFoundError: + return StateCache() + return StateCache(doc) + +def saveCache (c: StateCache): + doc = c.get_dict() + with open(CACHE_FILENAME, "w") as f: + f.write(pyjson5.dumps(doc)) + +def getOpenaiAPIKey () -> str: + global conf + return conf.get("api_keys", dict()).get("chatgpt") + +def checkSession (c: StateCache, s: requests.Session) -> bool: + with s.get( + "https://okky.kr/settings/profile", + allow_redirects = False) as rsp: + rsp.raise_for_status() + + h = rsp.status_code % 100 + return h == 2 + +def doSignin (c: StateCache, s: requests.Session): + global conf + body = conf["login"] + + with s.get("https://okky.kr/api/okky-web/auth/logout"): pass + with s.post( + "https://okky.kr/api/okky-web/auth/login", + json = body) as rsp: + rsp.raise_for_status() + +def fetchAPIData (url: str, s: requests.Session) -> str: + with s.get(url) as rsp: + return rsp.text + +def getPostList (url: str, s: requests.Session) -> list[dict[str, Any]]: + doc = pyjson5.loads(fetchAPIData(url, s)) + return doc["content"] + +def fetchPostData (url, s: requests.Session) -> dict[str, Any]: + doc = pyjson5.loads(fetchAPIData(url, s)) + return doc + +def issueMarkerJWT () -> str: + global conf + mc = conf["marker"] + + id = str(uuid.uuid4()) + payload = { + "iss": "okkybot", + "sub": "marker", + "jti": id + } + + return jwt.encode(payload, mc["secret"], algorithm = mc["algorithm"]) + +def validateMarkerJWT (token: str) -> bool: + global conf + mc = conf["marker"] + + payload = jwt.decode(token, mc["secret"], algorithms = mc["algorithm"]) + return payload["iss"] == "okkybot" and payload["sub"] == "marker" + +def writeComment (pid: int, result: str, s: requests.Session): + global dryrun + + marker_href = '''?okkybot-marker='''.format( + marker = urllib.parse.quote(issueMarkerJWT())) + + text = "" + text += '''<p>킁킁. AI는 이 글이 정치적이라고 생각합니다:</p>''' + text += '''<blockquote><p>{result}</p></blockquote>'''.format( + result = html.escape(result)) + # FIXME: 필터링됨 ... + # hidden 속성이 문제거나 + # 서버가 href를 직접 follow 해보거나 + # Fully-qualified URL이어야 하나 봄 + text += '''<a hidden href="{href}">'''.format(href = marker_href) + + body = { + "targetId": str(pid), + "note": { + "text": text + }, + "textType": "HTML", + "voted": 0 + } + + print({ "action": "comment", "data": body }) + + if not dryrun: + with s.post( + "{api}/comments".format(api = API_ENDPOINT), + json = body) as req: + req.raise_for_status() + +def stripHTML (x: str) -> list[str]: + parser = InnerHTMLExtractor() + parser.feed(x) + return parser.data + +def determineViability (x: list[str]) -> bool: + tokens = list[str]() + for line in x: + tokens.extend(line.split()) + + return len(tokens) <= POST_TOKEN_LIMIT + +def doLLMPrompt (title: str, stripped_body: str): + prompt = '''Is this post politically charged? +Answer Yes or No. Give a short explanation in Korean only if the answer is yes. +TITLE: {title} +BODY: {body}'''.format( + title = title, + body = " ".join(stripped_body)) + messages = [ { "role": "user", "content": prompt } ] + + rsp = openai.chat.completions.create( + model = "gpt-3.5-turbo", + messages = messages, + ) + + if rsp.choices: + return rsp.choices[0].message.content + +def hasMarkerInComments (comments: list[dict[str, Any]]) -> bool: + ext = JWTMarkerExtractor() + + for c in comments: + ext.marker.clear() + try: + ext.feed(c["text"]) + for m in ext.marker: + if validateMarkerJWT(m): + return True + except Exception as e: + sys.stderr.write( + "Whilst validating marker in comments: " + e + os.linesep) + + return False + +def processPost ( + topic: str, + pid: int, + post: dict[str, Any], + comments: list[dict[str, Any]], + s: requests.Session): + result = "" + + title = post["title"] + body = post["content"]["text"] + + stripped = stripHTML(body) + combined = stripped.copy() + combined.append(title) + + if hasMarkerInComments(comments): + result = ProcPostResult.MARKER.value + elif determineViability(combined): + result = doLLMPrompt(title, stripped) + else: + result = ProcPostResult.UNVIABLE.value + + print({ + "action": "LLM prompt result", + "data": [ + topic, + post["title"], + result, + ] + }) + + if (result[:len(ProcPostResult.YES.value)].lower() == + ProcPostResult.YES.value.lower()): + writeComment(pid, result[len(ProcPostResult.YES.value) + 1:].strip(), s) + +def doPost (topic: str, pid, s: requests.Session): + url = "{api}/articles/{pid}".format( + api = API_ENDPOINT, + pid = urllib.parse.quote(str(pid))) + post = fetchPostData(url, s) + + url = "{api}/comments/all?articleId={pid}".format( + api = API_ENDPOINT, + pid = urllib.parse.quote(str(pid))) + comments = fetchPostData(url, s) + + processPost(topic, pid, post, comments, s) + +def doTopic (topic: str, ts: StateCache.TopicState, s: requests.Session): + processed_posts = list[int]() + page = 0 + + try: + while True: # for each page + url = "{api}/articles?page={page}&categoryCode={topic}".format( + api = API_ENDPOINT, + topic = urllib.parse.quote(topic), + page = page) + posts = getPostList(url, s) + processed = False + + # Assumes that the response is unordered + # But halts when no post is processed in the page + for post in posts: + pid = post["id"] + if pid > ts.last_post: + doPost(topic, pid, s) + + processed = True + processed_posts.append(pid) + if len(processed_posts) >= MAX_POSTS_PER_TOPIC: + return + + if processed: + page += 1 + else: + break +# TODO: catch AI API rate limit + finally: + if processed_posts: + ts.last_post = max(processed_posts) + +with open(CONFIG_FILENAME) as f: + conf = pyjson5.loads(f.read()) + +c = getCache() +s = requests.Session() +s.cookies = c.cookies +s.headers["User-Agent"] = USER_AGENT + +openai.api_key = getOpenaiAPIKey() + +try: + if not checkSession(c, s): + doSignin(c, s) + + for topic in TARGET_TOPICS: + ts = c.topics.get(topic, StateCache.TopicState()) + doTopic(topic, ts, s) + c.topics[topic] = ts +finally: + c.cookies = s.cookies + saveCache(c) + +exit(0) |