// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit:
"version": "0.2.0",
"configurations": [
{
"name": "okkybot",
"type": "debugpy",
"request": "launch",
"cwd": "${workspaceFolder}/src",
"module": "okkybot",
"args": [],
"justMyCode": true
},
]
"login": {
"id": "",
"password": ""
},
"api_keys": {
"chatgpt": ""
},
"marker": {
"secret": "",
"algorithm": "HS256"
}
requires = ["hatchling"]
build-backend = ""
name = "okkybot"
version = "0.0.0"
authors = [
{ name="David Timber", email="" },
description = "okkybot"
readme = ""
requires-python = ">= 3.10"
dependencies = [ "pyjson5", "openai", "pyjwt[crypto]" ]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
Homepage = ""
Issues = ""
+from enum import Enum
+from html.parser import HTMLParser
+from typing import Any
+from urllib.parse import parse_qs, urlparse
+import requests.cookies
+import requests.sessions
class StateCache:
class TopicState:
def __init__ (self, saved = dict[str, Any]()):
self.last_post = saved.get("last_post", 0)
def get_dict (self) -> dict[str, Any]:
return {
"last_post": self.last_post
}
def parseCookies (doc: dict) -> requests.cookies.RequestsCookieJar:
pass
def __init__ (self, saved = dict[str, Any]):
self.topics = dict[str, dict[str, Any]]()
self.cookies = requests.sessions.cookiejar_from_dict(
saved.get("cookies", dict()))
for k, v in saved.get("topics", dict()).items():
self.topics[k] = StateCache.TopicState(v)
def get_dict (self) -> dict[str, Any]:
ret = dict[str, Any]()
if self.topics:
d = ret["topics"] = dict[str, Any]()
for k, v in self.topics.items():
d[k] = v.get_dict()
ret["cookies"] = self.cookies.get_dict()
return ret
class InnerHTMLExtractor (HTMLParser):
def __init__ (self):
super().__init__()
= list[str]()
def handle_data(self, data: str):
class JWTMarkerExtractor (HTMLParser):
def __init__ (self):
super().__init__()
self.marker = list[str]()
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]):
if tag.lower() != "a":
return
for kv in attrs:
if kv[0] == "href":
u = urlparse(kv[1])
if u.hostname != "" or u.path != "": continue
qs = parse_qs(u.query)
self.marker.extend(qs.get("okkybot-marker", []))
class ProcPostResult (Enum):
UNVIABLE = "Unviable"
YES = "yes"
NO = "no"
MARKER = "marker"
+import html
+import os
+import sys
+from typing import *
+import uuid
+import requests
+import pyjson5
+import openai
+import jwt
+import urllib.parse
+from okkybot import InnerHTMLExtractor, JWTMarkerExtractor, ProcPostResult, StateCache
VERSION = "0"
USER_AGENT = "Okkybot/{v} (+".format(
v = VERSION)
CACHE_FILENAME = "cache.json"
CONFIG_FILENAME = "config.jsonc"
TARGET_TOPICS = [ "community" ]
POST_TOKEN_LIMIT = 5000 # $0.0025 spending limit per post
dryrun = True
def getCache () -> StateCache:
try:
with open(CACHE_FILENAME) as f:
doc = pyjson5.loads(
except FileNotFoundError:
return StateCache()
return StateCache(doc)
def saveCache (c: StateCache):
doc = c.get_dict()
with open(CACHE_FILENAME, "w") as f:
f.write(pyjson5.dumps(doc))
def getOpenaiAPIKey () -> str:
global conf
return conf.get("api_keys", dict()).get("chatgpt")
def checkSession (c: StateCache, s: requests.Session) -> bool:
with s.get(
"",
allow_redirects = False) as rsp:
rsp.raise_for_status()
h = rsp.status_code % 100
return h == 2
def doSignin (c: StateCache, s: requests.Session):
global conf
body = conf["login"]
with s.get(""): pass
with
"",
json = body) as rsp:
rsp.raise_for_status()
def fetchAPIData (url: str, s: requests.Session) -> str:
with s.get(url) as rsp:
return rsp.text
def getPostList (url: str, s: requests.Session) -> list[dict[str, Any]]:
doc = pyjson5.loads(fetchAPIData(url, s))
return doc["content"]
def fetchPostData (url, s: requests.Session) -> dict[str, Any]:
doc = pyjson5.loads(fetchAPIData(url, s))
return doc
def issueMarkerJWT () -> str:
global conf
mc = conf["marker"]
id = str(uuid.uuid4())
payload = {
"iss": "okkybot",
"sub": "marker",
"jti": id
}
return jwt.encode(payload, mc["secret"], algorithm = mc["algorithm"])
def validateMarkerJWT (token: str) -> bool:
global conf
mc = conf["marker"]
payload = jwt.decode(token, mc["secret"], algorithms = mc["algorithm"])
return payload["iss"] == "okkybot" and payload["sub"] == "marker"
def writeComment (pid: int, result: str, s: requests.Session):
global dryrun
marker_href = '''?okkybot-marker='''.format(
marker = urllib.parse.quote(issueMarkerJWT()))
text = ""
text += '''<p>킁킁. AI는 이 글이 정치적이라고 생각합니다:</p>'''
text += '''<blockquote><p>{result}</p></blockquote>'''.format(
result = html.escape(result))
+ # FIXME: 필터링됨 ...
+ # hidden 속성이 문제거나
+ # 서버가 href를 직접 follow 해보거나
+ # Fully-qualified URL이어야 하나 봄
text += '''<a hidden href="{href}">'''.format(href = marker_href)
body = {
"targetId": str(pid),
"note": {
"text": text
},
"textType": "HTML",
"voted": 0
}
print({ "action": "comment", "data": body })
if not dryrun:
with
"{api}/comments".format(api = API_ENDPOINT),
json = body) as req:
req.raise_for_status()
def stripHTML (x: str) ->
+ parser = InnerHTMLExtractor()
+ parser.feed(x)
+ return
+def determineViability (x: list[str]) -> bool:
+ tokens = list[str]()
+ for line in x:
+ tokens.extend(line.split())
+ return len(tokens) <= POST_TOKEN_LIMIT
+def doLLMPrompt (title: str, stripped_body: str):
+ prompt = '''Is this post politically charged?
+Answer Yes or No. Give a short explanation in Korean only if the answer is yes.
+TITLE: {title}
+BODY: {body}'''.format(
+ title = title,
+ body = " ".join(stripped_body))
+ messages = [ { "role": "user", "content": prompt } ]
+ rsp =
+ model = "gpt-3.5-turbo",
+ messages = messages,
+ )
+ if rsp.choices:
+ return rsp.choices[0].message.content
+def hasMarkerInComments (comments: list[dict[str, Any]]) -> bool:
+ ext = JWTMarkerExtractor()
+ for c in comments:
+ ext.marker.clear()
+ try:
+ ext.feed(c["text"])
+ for m in ext.marker:
+ if validateMarkerJWT(m):
+ return True
+ except Exception as e:
+ sys.stderr.write(
+ "Whilst validating marker in comments: " + e + os.linesep)
+ return False
+def processPost (
+ topic: str,
+ pid: int,
+ post: dict[str, Any],
+ comments: list[dict[str, Any]],
+ s: requests.Session):
+ result = ""
+ title = post["title"]
+ body = post["content"]["text"]
+ stripped = stripHTML(body)
+ combined = stripped.copy()
+ combined.append(title)
+ if hasMarkerInComments(comments):
+ result = ProcPostResult.MARKER.value
+ elif determineViability(combined):
+ result = doLLMPrompt(title, stripped)
+ else:
+ result = ProcPostResult.UNVIABLE.value
+ print({
+ "action": "LLM prompt result",
+ "data": [
+ topic,
+ post["title"],
+ result,
+ ]
+ })
+ if (result[:len(ProcPostResult.YES.value)].lower() ==
+ ProcPostResult.YES.value.lower()):
+ writeComment(pid, result[len(ProcPostResult.YES.value) + 1:].strip(), s)
+def doPost (topic: str, pid, s: requests.Session):
+ url = "{api}/articles/{pid}".format(
+ pid = urllib.parse.quote(str(pid)))
+ post = fetchPostData(url, s)
+ url = "{api}/comments/all?articleId={pid}".format(
+ pid = urllib.parse.quote(str(pid)))
+ comments = fetchPostData(url, s)
+ processPost(topic, pid, post, comments, s)
+def doTopic (topic: str, ts: StateCache.TopicState, s: requests.Session):
+ processed_posts = list[int]()
+ page = 0
+ try:
+ while True: # for each page
+ url = "{api}/articles?page={page}&categoryCode={topic}".format(
+ topic = urllib.parse.quote(topic),
+ page = page)
+ posts = getPostList(url, s)
+ processed = False
+ # Assumes that the response is unordered
+ # But halts when no post is processed in the page
+ for post in posts:
+ pid = post["id"]
+ if pid > ts.last_post:
+ doPost(topic, pid, s)
+ processed = True
+ processed_posts.append(pid)
+ if len(processed_posts) >= MAX_POSTS_PER_TOPIC:
+ return
+ if processed:
+ page += 1
+ else:
+ break
+# TODO: catch AI API rate limit
+ finally:
+ if processed_posts:
+ ts.last_post = max(processed_posts)
+with open(CONFIG_FILENAME) as f:
+ conf = pyjson5.loads(
+c = getCache()
+s = requests.Session()
+s.cookies = c.cookies
+s.headers["User-Agent"] = USER_AGENT
+openai.api_key = getOpenaiAPIKey()
+ if not checkSession(c, s):
+ doSignin(c, s)
+ for topic in TARGET_TOPICS:
+ ts = c.topics.get(topic, StateCache.TopicState())
+ doTopic(topic, ts, s)
+ c.topics[topic] = ts
+ c.cookies = s.cookies
+ saveCache(c)