-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathfactoids.py
169 lines (142 loc) · 5.87 KB
/
factoids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import asyncio, re
from argparse import ArgumentParser
from typing import Dict, List, Optional
from irctokens import build, Line
from ircrobots import Bot as BaseBot
from ircrobots import Server as BaseServer
from ircrobots import ConnectionParams
TRIGGER = "!"
def _delims(s: str, delim: str):
s_copy = list(s)
while s_copy:
char = s_copy.pop(0)
if char == delim:
if not s_copy:
yield len(s)-(len(s_copy)+1)
elif not s_copy.pop(0) == delim:
yield len(s)-(len(s_copy)+2)
def _sed(sed: str, s: str) -> Optional[str]:
if len(sed) > 1:
delim = sed[1]
last = 0
parts: List[str] = []
for i in _delims(sed, delim):
parts.append(sed[last:i])
last = i+1
if len(parts) == 4:
break
if last < (len(sed)):
parts.append(sed[last:])
_, pattern, replace, *args = parts
flags_s = (args or [""])[0]
flags = re.I if "i" in flags_s else 0
count = 0 if "g" in flags_s else 1
for i in reversed(list(_delims(replace, "&"))):
replace = replace[:i] + "\\g<0>" + replace[i+1:]
try:
compiled = re.compile(pattern, flags)
except:
return None
return re.sub(compiled, replace, s, count)
else:
return None
class Database:
def __init__(self):
self._settings: Dict[str, str] = {}
async def get(self, context: str, setting: str) -> Optional[str]:
return self._settings.get(setting, None)
async def set(self, context: str, setting: str, value: str):
self._settings[setting] = value
async def rem(self, context: str, setting: str):
if setting in self._settings:
del self._settings[setting]
class Server(BaseServer):
def __init__(self, bot: Bot, name: str, channel: str, database: Database):
super().__init__(bot, name)
self._channel = channel
self._database = database
async def line_send(self, line: Line):
print(f"> {line.format()}")
async def line_read(self, line: Line):
print(f"< {line.format()}")
me = self.nickname_lower
if line.command == "001":
await self.send(build("JOIN", [self._channel]))
if (
line.command == "PRIVMSG" and
self.has_channel(line.params[0]) and
not line.hostmask is None and
not self.casefold(line.hostmask.nickname) == me and
self.has_user(line.hostmask.nickname) and
line.params[1].startswith(TRIGGER)):
channel = self.channels[self.casefold(line.params[0])]
user = self.users[self.casefold(line.hostmask.nickname)]
cuser = channel.users[user.nickname_lower]
text = line.params[1].replace(TRIGGER, "", 1)
db_context = f"{self.name}:{channel.name}"
name, _, text = text.partition(" ")
action, _, text = text.partition(" ")
name = name.lower()
key = f"factoid-{name}"
out = ""
if not action or action == "@":
value = await self._database.get(db_context, key)
if not value is None:
out = f"({name}) {value}"
if action == "@" and text:
target, _, _ = text.partition(" ")
out = f"{target}: {out}"
else:
out = f"{user.nickname}: '{name}' not found"
elif action in ["==", "~="]:
if "o" in cuser.modes:
value, _, _ = text.partition(" ")
if action == "==":
if value:
await self._database.set(db_context, key, value)
out = f"{user.nickname}: added factoid {name}"
else:
await self._database.rem(db_context, key)
out = f"{user.nickname}: removed factoid {name}"
elif action == "~=":
current = await self._database.get(db_context, key)
if current is None:
out = f"{user.nickname}: '{name}' not found"
elif value:
changed = _sed(value, current)
if not changed is None:
await self._database.set(
db_context, key, changed)
out = (f"{user.nickname}: "
f"changed '{name}' factoid")
else:
out = f"{user.nickname}: invalid sed"
else:
out = f"{user.nickname}: please provide a sed"
else:
out = f"{user.nickname}: you are not an op"
else:
out = f"{user.nickname}: unknown action '{action}'"
await self.send(build("PRIVMSG", [line.params[0], out]))
class Bot(BaseBot):
def __init__(self, channel: str):
super().__init__()
self._channel = channel
def create_server(self, name: str):
return Server(self, name, self._channel, Database())
async def main(hostname: str, channel: str, nickname: str):
bot = Bot(channel)
params = ConnectionParams(
nickname,
hostname,
6697
)
await bot.add_server("freenode", params)
await bot.run()
if __name__ == "__main__":
parser = ArgumentParser(description="A simple IRC bot for factoids")
parser.add_argument("hostname")
parser.add_argument("channel")
parser.add_argument("nickname")
args = parser.parse_args()
asyncio.run(main(args.hostname, args.channel, args.nickname))