Add working unit tests for new channel command

This commit is contained in:
Griatch 2021-04-20 23:57:18 +02:00
parent aa5a07f6d0
commit 3c4578b648
7 changed files with 309 additions and 134 deletions

View file

@ -30,7 +30,7 @@ __all__ = (
"CmdAddCom",
"CmdDelCom",
"CmdAllCom",
#"CmdChannels",
"CmdChannels",
"CmdCdestroy",
"CmdCBoot",
"CmdCemit",
@ -60,10 +60,10 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
channel/list
channel/all
channel/history channelname [= index]
channel/sub channelname [= alias]
channel/sub channelname [= alias[;alias...]]
channel/unsub channelname[,channelname, ...]
channel/alias channelname = alias
channel/unalias channelname = alias
channel/alias channelname = alias[;alias...]
channel/unalias alias
channel/mute channelname[,channelname,...]
channel/unmute channelname[,channelname,...]
channel/create channelname;alias;alias:typeclass [= description]
@ -87,6 +87,10 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
"list", "all", "history", "sub", "unsub", "mute", "unmute", "alias", "unalias",
"create", "destroy", "desc", "lock", "unlock", "boot", "ban", "unban", "who",)
# note - changing this will invalidate existing aliases in db
channel_msg_nick_alias = "{alias} $1"
channel_msg_nick_replacement = "channel {channelname} = $1"
def search_channel(self, channelname, exact=False):
"""
Helper function for searching for a single channel with some error
@ -181,6 +185,11 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
if channel.has_connection(caller):
return False, f"Already listening to channel {channel.key}."
result = channel.connect(caller)
key_and_aliases = [channel.key.lower()] + [alias.lower() for alias in channel.aliases.all()]
for key_or_alias in key_and_aliases:
self.add_alias(channel, key_or_alias)
return result, "" if result else f"Were not allowed to subscribe to channel {channel.key}"
def unsub_from_channel(self, channel):
@ -201,46 +210,87 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
if not channel.has_connection(caller):
return False, f"Not listening to channel {channel.key}."
# clear nicks
chkey = channel.key
for nick in [
nick
for nick in make_iter(caller.nicks.get(category="channel", return_obj=True))
if nick and nick.pk and nick.value[3].lower() == chkey
]:
nick.delete()
for key_or_alias in self.get_channel_aliases(channel):
self.remove_alias(key_or_alias)
result = channel.disconnect(caller)
return result, "" if result else f"Could not unsubscribe from channel {channel.key}"
def add_alias(self, channel, alias):
def add_alias(self, channel, alias, **kwargs):
"""
Add a new alias for the user to use with this channel.
Add a new alias (nick) for the user to use with this channel.
Args:
channel (Channel): The channel to alias.
alias (str): The personal alias to use for this channel.
**kwargs: If given, passed into nicks.add.
Note:
We add two nicks - one is a plain `alias -> channel.key` that
we need to be able to reference this channel easily. The other
is a templated nick to easily be able to send messages to the
channel without needing to give the full `channel` command. The
structure of this nick is given by `self.channel_msg_nick_alias`
and `self.channel_msg_nick_replacement`. By default it maps
`alias <msg> -> channel <channelname> = <msg>`, so that you can
for example just write `pub Hello` to send a message.
The alias created is `alias $1 -> channel channel = $1`, to allow
for sending to channel using the main channel command.
"""
self.caller.nicks.add(alias, channel.key, category="channel")
chan_key = channel.key.lower()
msg_alias = self.channel_msg_nick_alias.format(alias=alias)
msg_replacement = self.channel_msg_nick_replacement.format(channelname=chan_key)
def remove_alias(self, alias):
if chan_key != alias:
self.caller.nicks.add(alias, chan_key, category="channel", **kwargs)
self.caller.nicks.add(msg_alias, msg_replacement, category="channel", **kwargs)
def remove_alias(self, alias, **kwargs):
"""
Remove an alias from a given channel.
Remove an alias from a channel.
Args:
alias (str, optional): The alias to remove, or `None` for all.
alias (str, optional): The alias to remove.
The channel will be reverse-determined from the
alias, if it exists.
Returns:
bool, str: True, None if removal succeeded. If False,
the second part is an error string.
**kwargs: If given, passed into nicks.get/add.
Note:
This will remove two nicks - the plain channel alias and the templated
nick used for easily sending messages to the channel.
"""
caller = self.caller
channame = caller.nicks.get(key=alias, category="channel")
if channame:
caller.nicks.remove(key=alias, category="channel")
if caller.nicks.get(alias, category="channel", **kwargs):
msg_alias = self.channel_msg_nick_alias.format(alias=alias)
caller.nicks.remove(alias, category="channel", **kwargs)
caller.nicks.remove(msg_alias, category="channel", **kwargs)
return True, ""
return False, "No such alias was defined."
def get_channel_aliases(self, channel):
"""
Get a user's aliases for a given channel. The user is retrieved
through self.caller.
Args:
channel (Channel): The channel to act on.
Returns:
list: A list of zero, one or more alias-strings.
"""
chan_key = channel.key.lower()
nicktuples = self.caller.nicks.get(category="channel", return_tuple=True)
if nicktuples:
return [tup[2] for tup in nicktuples if tup[3].lower() == chan_key]
return []
def mute_channel(self, channel):
"""
Temporarily mute a channel.
@ -419,12 +469,13 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
nick.delete()
channel.disconnect(target)
reason = f" Reason: {reason}" if reason else ""
target.msg(f"You were booted from channel {channel.key} by {caller.key}.{reason}")
target.msg(f"You were booted from channel {channel.key} by {self.caller.key}.{reason}")
if not quiet:
channel.msg(f"{target.key} was booted from channel by {caller.key}.{reason}")
channel.msg(f"{target.key} was booted from channel by {self.caller.key}.{reason}")
logger.log_sec(f"Channel Boot: {target} (Channel: {channel}, "
f"Reason: {reason}, Caller: {caller}")
f"Reason: {reason.strip()}, Caller: {self.caller}")
return True, ""
def ban_user(self, channel, target, quiet=False, reason=""):
"""
@ -442,7 +493,7 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
the second part is an error string.
"""
boot_user(self.caller, channel, target, quiet=quiet, reason=reason)
self.boot_user(channel, target, quiet=quiet, reason=reason)
if channel.ban(target):
return True, ""
return False, f"{target} is already banned from this channel."
@ -508,6 +559,7 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
name = subscriber.get_display_name(caller)
conditions = ("muted" if subscriber in mute_list else "",
"offline" if subscriber not in online_list else "")
conditions = (cond for cond in conditions if cond)
cond_text = "(" + ", ".join(conditions) + ")" if conditions else ""
who_list.append(f"{name}{cond_text}")
@ -547,8 +599,6 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
EvTable: Table to display.
"""
caller = self.caller
comtable = self.styled_table(
"|wchannel|n",
"|wmy aliases|n",
@ -558,14 +608,12 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
)
for chan in subscribed:
clower = chan.key.lower()
nicks = caller.nicks.get(category="channel", return_obj=True)
my_aliases = ", ".join(self.get_channel_aliases(chan))
comtable.add_row(
*("{}{}".format(
chan.key,
"({})".format(",".join(chan.aliases.all())) if chan.aliases.all() else ""),
",".join(nick.db_key for nick in make_iter(nicks)
if nick and nick.value[3].lower() == clower),
chan.key,
"({})".format(",".join(chan.aliases.all())) if chan.aliases.all() else ""),
my_aliases,
chan.db.desc))
return comtable
@ -593,9 +641,7 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
channels = subscribed + available
for chan in channels:
clower = chan.key.lower()
nicks = caller.nicks.get(category="channel", return_obj=True)
nicks = nicks or []
my_aliases = ", ".join(self.get_channel_aliases(chan))
if chan not in subscribed:
substatus = "|rNo|n"
elif caller in chan.mutelist:
@ -607,8 +653,7 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
"{}{}".format(
chan.key,
"({})".format(",".join(chan.aliases.all())) if chan.aliases.all() else ""),
",".join(nick.db_key for nick in make_iter(nicks)
if nick.value[3].lower() == clower),
my_aliases,
str(chan.locks),
chan.db.desc))
comtable.reformat_column(0, width=9)
@ -666,6 +711,20 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
self.msg(err)
return
if 'unalias' in switches:
# remove a personal alias (no channel needed)
alias = self.rhs
if not alias:
self.msg("Specify the alias to remove as channel/unalias <alias>")
return
success, err = self.remove_alias(alias)
if success:
self.msg(f"Removed your channel alias '{alias}'.")
else:
self.msg(err)
return
channels = []
for channel_name in channel_names:
# find a channel by fuzzy-matching. This also checks
@ -691,7 +750,7 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
# inspect a given channel
subscribed, available = self.list_channels()
if channel in subscribed:
table = self.display_subbed_channels[channel]
table = self.display_subbed_channels([channel])
self.msg(
"\n|wSubscribed to Channel|n "
f"(use |w/all|n to see all available)\n{table}")
@ -717,11 +776,16 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
if 'sub' in switches:
# subscribe to a channel
aliases = set(alias.strip().lower() for alias in self.rhs.split(";"))
success, err = self.sub_to_channel(channel)
if success:
for alias in aliases:
self.add_alias(channel, alias)
alias_txt = ', '.join(aliases)
alias_txt = f" using alias(es) {alias_txt}" if aliases else ''
self.msg("You are now subscribed "
f"to the channel {channel.key}. Use /alias to "
"be able to use different names to refer to the channel.")
f"to the channel {channel.key}{alias_txt}. Use /alias to "
"add additional aliases for referring to the channel.")
else:
self.msg(err)
return
@ -746,19 +810,6 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
self.msg(f"Added/updated your alias '{alias}' for channel {channel.key}.")
return
if 'unalias' in switches:
# remove a personal alias for a channel
alias = self.rhs
if not alias:
self.msg("Specify the alias to remove as channel/unalias channelname = alias")
return
success, err = self.remove_alias(channel, alias)
if success:
self.msg(f"Removed your alias '{alias}' for channel {channel.key}")
else:
self.msg(err)
return
if 'mute' in switches:
# mute a given channel
success, err = self.mute_channel(channel)
@ -798,18 +849,33 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
"Aborted."
)
if 'desc' in switches:
# set channel description
desc = self.rhs.strip()
if not channel.access(caller, "control"):
self.msg("You can only change description of channels you control.")
return
if not desc:
self.msg("Usage: /desc channel = description")
return
self.set_desc(channel, desc)
self.msg("Updated channel description.")
if 'lock' in switches:
# add a lockstring to channel
lockstring = self.rhs.strip()
if not lockstring:
self.msg("Usage: channel/lock channelname = lockstring")
return
if not channel.access(caller, "control"):
self.msg("You need 'control'-access to change locks on this channel.")
return
if not lockstring:
self.msg("Usage: channel/lock channelname = lockstring")
return
success, err = self.set_lock(channel, self.rhs)
if success:
caller.msg("Added/updated lock on channel.")
@ -829,9 +895,9 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
self.msg("You need 'control'-access to change locks on this channel.")
return
success, err = self.set_lock(channel, self.rhs)
success, err = self.unset_lock(channel, self.rhs)
if success:
caller.msg("Removed lock on channel.")
caller.msg("Removed lock from channel.")
else:
caller.msg(f"Could not remove lock: {err}")
return
@ -848,12 +914,12 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
for chan in channels:
if not chan.access(caller, "admin"):
self.msg("You need 'control'-access to boot a user from {chan.key}.")
if not chan.access(caller, "control"):
self.msg(f"You need 'control'-access to boot a user from {chan.key}.")
return
# the target must be a member of all given channels
target = self.search(target_str, candidates=chan.subscriptions.all())
target = caller.search(target_str, candidates=chan.subscriptions.all())
if not target:
caller.msg(f"Cannot boot '{target_str}' - not in channel {chan.key}.")
return
@ -867,10 +933,12 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
caller.msg(f"Cannot boot {target.key} from channel {chan.key}: {err}")
channames = ", ".join(chan.key for chan in channels)
reasonwarn = (". Also note that your reason will be echoed to the channel"
if reason else '')
ask_yes_no(
caller,
f"Are you sure you want to boot user {target.key} from "
f"channel(s) {channames} (make sure name/channels are correct) "
f"channel(s) {channames} (make sure name/channels are correct{reasonwarn}). "
"{yesno}?",
_boot_user,
"Aborted.",
@ -885,7 +953,7 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
# view bans for channels
if not channel.access(caller, "control"):
self.msg("You need 'control'-access to view bans on channel {channel.key}")
self.msg(f"You need 'control'-access to view bans on channel {channel.key}")
return
bans = ["Channel bans "
@ -899,7 +967,12 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
for chan in channels:
# the target must be a member of all given channels
target = self.search(target_str, candidates=chan.subscriptions.all())
if not chan.access(caller, "control"):
caller.msg(f"You don't have access to ban users on channel {chan.key}")
return
target = caller.search(target_str, candidates=chan.subscriptions.all())
if not target:
caller.msg(f"Cannot ban '{target_str}' - not in channel {chan.key}.")
return
@ -908,26 +981,57 @@ class CmdChannel(COMMAND_DEFAULT_CLASS):
for chan in channels:
success, err = self.ban_user(chan, target, quiet=False, reason=reason)
if success:
caller.msg(f"Banned {target.key} from channel {chan.key}.")
self.msg(f"Banned {target.key} from channel {chan.key}.")
else:
caller.msg(f"Cannot boot {target.key} from channel {chan.key}: {err}")
self.msg(f"Cannot boot {target.key} from channel {chan.key}: {err}")
channames = ", ".join(chan.key for chan in channels)
reasonwarn = (". Also note that your reason will be echoed to the channel"
if reason else '')
ask_yes_no(
caller,
f"Are you sure you want to ban user {target.key} from "
f"channel(s) {channames} (make sure name/channels are correct) "
f"channel(s) {channames} (make sure name/channels are correct{reasonwarn}) "
"{yesno}?",
_ban_user,
"Aborted.",
)
return
if 'unban' in switches:
# unban a previously banned user from channel
target_str = self.rhs.strip()
if not target_str:
self.msg("Usage: channel[,channel,...] = user")
return
banlists = []
for chan in channels:
# the target must be a member of all given channels
if not chan.access(caller, "control"):
caller.msg(f"You don't have access to unban users on channel {chan.key}")
return
banlists.extend(chan.banlist)
target = caller.search(target_str, candidates=banlists)
if not target:
self.msg("Could not find a banned user '{target_str}' in given channel(s).")
return
for chan in channels:
success, err = self.unban_user(channel, target)
if success:
self.msg(f"Un-banned {target_str} from channel {chan.key}")
else:
self.msg(err)
return
if "who" in switches:
# view who's a member of a channel
who_list = [f"Subscribed to {channel.key}:"]
who_list.expand(self.channel_list_who(channel))
who_list.extend(self.channel_list_who(channel))
caller.msg("\n".join(who_list))
return