Initial commit

This commit is contained in:
2026-02-23 22:48:25 +00:00
commit b8182ead88
46 changed files with 1845 additions and 0 deletions

28
Bot/__main__.py Normal file
View File

@@ -0,0 +1,28 @@
__author__ = 'hor00s'
__email__ = 'hor00s199@gmail.com'
__version__ = '1.0.0 beta'
__description__ = 'This bot will go through a sub and\
alert mods through modmail if a post has been deleted'
__license__ = 'MIT'
__dependencies__ = ('praw',)
__disclaimer__ = """Disclaimer:
This Python bot is a personal hobby project created for fun and learning purposes.
It is not intended for any commercial use or critical tasks.
While efforts have been made to ensure its functionality, there may be bugs and/or errors.
The bot is provided as-is, without any warranty or guarantee of its performance.
Use it at your own risk.
The author and maintainers of this bot are not responsible for any damages, data loss,
or any adverse consequences that may arise from its use.
We do not collect or store any personal data or information from users."""
import sys
from main import main
if __name__ == '__main__':
sys.exit(
main()
)

Binary file not shown.

Binary file not shown.

1
Bot/bot/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .post import * # noqa

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

24
Bot/bot/post.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from sqlitewrapper import Model, Datatype, Row
__all__ = (
'Datatype',
'Posts',
'Row',
)
class Posts(Model):
def __init__(self, db_name: str, save_path: Path) -> None:
self.__table = {
'username': Datatype.STR,
'title': Datatype.STR,
'text': Datatype.STR,
'post_id': Datatype.STR,
'deletion_method': Datatype.STR,
'post_last_edit': Datatype.STR,
'record_created': Datatype.STR,
'record_edited': Datatype.STR,
}
super().__init__(db_name, save_path, **self.__table)

View File

@@ -0,0 +1 @@
from .autosavedict import * # noqa

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,120 @@
from __future__ import annotations
import os
import json
from typing import (
Any,
Dict,
Tuple,
Mapping,
Optional,
Iterable,
MutableMapping,
)
__all__ = (
'AutoSaveDict',
)
class AutoSaveDict(dict[Any, Any]):
def __init__(self,
file_path: Optional[os.PathLike[Any]] = None,
**pairs: Any):
self.file_path = file_path
self.__default = pairs
if self.file_path is None:
self._pairs = pairs
elif not os.path.exists(self.file_path):
self._pairs = pairs
else:
self._pairs = self._read()
super(AutoSaveDict, self).__init__(**self._pairs)
def __setitem__(self, __key: Any, __value: Any) -> None:
data = self._read()
data[__key] = __value
self._write(data)
super().__setitem__(__key, __value)
def __delitem__(self, __key: Any) -> None:
data = self._read()
del data[__key]
self._write(data)
return super().__delitem__(__key)
def __or__(self, __value: Mapping[Any, Any]) -> AutoSaveDict:
data = self._pairs | __value
return AutoSaveDict(None, **data)
def _write(self, content: Dict[Any, Any]) -> None:
with open(self.file_path, mode='w') as f: # type: ignore
json.dump(content, f, indent=4)
self._pairs = content
def _read(self) -> Dict[Any, Any]:
with open(self.file_path, mode='r') as f: # type: ignore
data: Dict[Any, Any] = json.load(f)
return data
@classmethod
def fromkeys(cls, __iterable: Iterable[Any], __value: Any = None,
file_path: Optional[os.PathLike[Any]] = None) -> AutoSaveDict:
data = {}
for key in __iterable:
data[key] = __value
return cls(file_path, **data)
@classmethod
def frommapping(cls, __mapping: Mapping[Any, Any],
file_path: Optional[os.PathLike[Any]] = None)\
-> AutoSaveDict:
data = dict(__mapping)
return cls(file_path, **data)
@classmethod
def fromfile(cls, src: os.PathLike[Any],
dst: Optional[os.PathLike[Any]] = None) -> AutoSaveDict:
with open(src, mode='r') as f:
data = json.load(f)
return AutoSaveDict(dst, **data)
def init(self) -> None:
if not os.path.exists(self.file_path): # type: ignore
self._write(self._pairs)
else:
self._pairs = self._read()
def restore(self) -> None:
self.clear()
self.update(self.__default)
self.init()
def copy(self,
file_path: Optional[os.PathLike[Any]] = None) -> AutoSaveDict:
data = {}
for key, val in self.items():
data[key] = val
return AutoSaveDict(file_path, **data)
def pop(self, __key: Any) -> Any: # type: ignore
data = self._read()
data.pop(__key)
self._write(data)
return super().pop(__key)
def popitem(self) -> Tuple[Any, Any]:
key = tuple(self._read().keys())[-1]
value = self.pop(key)
super().popitem()
return (key, value)
def clear(self) -> None:
self._write({})
super().clear()
def update(self, __m: Mapping[Any, Any]) -> MutableMapping: # type: ignore
for k, v in __m.items():
self[k] = v
super().update(__m)

146
Bot/jsonwrapper/tests.py Normal file
View File

@@ -0,0 +1,146 @@
import os
import json
import unittest
from pathlib import Path
from .autosavedict import AutoSaveDict
BASE_DIR = f'{os.sep}'.join(__file__.split(os.sep)[:-1])
class TestAutoSaveDict(unittest.TestCase):
def setUp(self) -> None:
self.path = Path(BASE_DIR) / 'test.json'
self.default = {'a': 1, 'b': 2}
asd = AutoSaveDict(self.path, **self.default)
asd.init()
return super().setUp()
def tearDown(self) -> None:
os.remove(self.path)
return super().tearDown()
def test_init(self) -> None:
asd = AutoSaveDict(self.path, **self.default)
asd.init()
self.assertTrue(os.path.exists(self.path))
self.assertEqual(asd, self.default)
self.assertEqual(asd._pairs, self.default)
self.assertEqual(asd._read(), self.default)
def test_restore(self) -> None:
asd = AutoSaveDict(self.path, **self.default)
asd['c'] = 3
self.assertIn('c', asd)
asd.restore()
self.assertEqual(asd, self.default)
self.assertEqual(asd._read(), self.default)
self.assertEqual(asd._pairs, self.default)
def test_pop(self) -> None:
asd = AutoSaveDict(self.path, **self.default)
asd.init()
key = tuple(self.default.keys())[0]
asd.pop(key)
self.assertNotIn(key, asd._read())
self.assertNotIn(key, asd._pairs)
self.assertNotIn(key, asd)
def test_popitem(self) -> None:
asd = AutoSaveDict(self.path, **self.default)
asd.init()
key = tuple(asd.keys())[-1]
val = asd[key]
result = asd.popitem()
self.assertEqual(result, (key, val))
self.assertNotIn(key, asd)
self.assertNotIn(key, asd._read())
self.assertNotIn(key, asd._pairs)
def test_update(self) -> None:
asd = AutoSaveDict(self.path, **self.default)
asd.init()
new = {'b': 3}
asd.update(new)
self.assertEqual(asd['b'], new['b'])
self.assertEqual(asd._read()['b'], new['b'])
self.assertEqual(asd._pairs['b'], new['b'])
def test_copy(self) -> None:
copy_path = 'test_copy.json'
asd = AutoSaveDict(self.path, **self.default)
asd_copy = asd.copy(copy_path) # type: ignore
asd.init()
asd_copy.init()
self.assertEqual(asd_copy, asd)
self.assertEqual(asd_copy, asd._read())
self.assertEqual(asd_copy, asd._pairs)
os.remove(copy_path)
def test_fromfile(self) -> None:
file = 'test_fromfile.json'
config = {'b': 3, 'c': 4}
with open(file, mode='w') as f:
json.dump(config, f)
os.remove(self.path)
asd = AutoSaveDict.fromfile(file, self.path) # type: ignore
asd.init()
self.assertEqual(config, asd)
self.assertEqual(config, asd._pairs)
self.assertEqual(config, asd._read())
self.assertIsInstance(asd, AutoSaveDict)
os.remove(file)
def test_frommapping(self) -> None:
path = 'test_frommapping.json'
mapping = (
('a', 1),
('b', 2),
('c', 3),
)
expected = dict(mapping)
asd = AutoSaveDict.frommapping(mapping, path) # type: ignore
asd.init()
self.assertEqual(expected, asd)
self.assertEqual(expected, asd._read())
self.assertEqual(expected, asd._pairs)
os.remove(path)
def test_fromkeys(self) -> None:
path = 'test_fromkeys.json'
keys = 'test'
expected = dict.fromkeys(keys)
asd = AutoSaveDict.fromkeys(keys, file_path=path) # type: ignore
asd.init()
self.assertEqual(asd, expected)
self.assertEqual(asd._read(), expected)
self.assertEqual(asd._pairs, expected)
os.remove(path)
def test_setitem(self) -> None:
expected = {**self.default, "z": 3}
asd = AutoSaveDict(self.path, **self.default)
asd['z'] = 3
self.assertEqual(asd, expected)
self.assertEqual(asd._read(), expected)
self.assertEqual(asd._pairs, expected)
def test_delitem(self) -> None:
config = self.default.copy()
key = tuple(config.keys())[0]
asd = AutoSaveDict(self.path, **config)
asd.init()
del config[key]
del asd[key]
self.assertEqual(asd, config)
self.assertEqual(asd._read(), config)
self.assertEqual(asd._pairs, config)

1
Bot/logger/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .logger import * # noqa

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

320
Bot/logger/logger.py Normal file
View File

@@ -0,0 +1,320 @@
from __future__ import annotations
import os
import sys
from enum import Enum
from inspect import currentframe
from typing import (
Optional,
Tuple,
Dict,
List,
Any,
)
__name__ = 'testing'
__all__ = [
'UnhandledLogError',
'get_color',
'Logger',
]
class Color(Enum):
# Color end string, color reset
RESET = "\033[0m"
# Regular Colors. Normal color, no bold, background color etc.
BLACK = "\033[0;30m" # BLACK
RED = "\033[0;31m" # RED
GREEN = "\033[0;32m" # GREEN
YELLOW = "\033[0;33m" # YELLOW
BLUE = "\033[0;34m" # BLUE
MAGENTA = "\033[0;35m" # MAGENTA
CYAN = "\033[0;36m" # CYAN
WHITE = "\033[0;37m" # WHITE
# Bold colors
BLACK_BOLD = "\033[1;30m" # BLACK
RED_BOLD = "\033[1;31m" # RED
GREEN_BOLD = "\033[1;32m" # GREEN
YELLOW_BOLD = "\033[1;33m" # YELLOW
BLUE_BOLD = "\033[1;34m" # BLUE
MAGENTA_BOLD = "\033[1;35m" # MAGENTA
CYAN_BOLD = "\033[1;36m" # CYAN
WHITE_BOLD = "\033[1;37m" # WHITE
def get_color(color: str) -> str:
"""Colors:
```
(
"RESET",
"BLACK",
"RED",
"GREEN",
"YELLOW",
"BLUE",
"MAGENTA",
"CYAN",
"WHITE",
"BLACK_BOLD",
"RED_BOLD",
"GREEN_BOLD",
"YELLOW_BOLD",
"BLUE_BOLD",
"MAGENTA_BOLD",
"CYAN_BOLD",
"WHITE_BOLD",
)
```
"""
return {i.name: i.value for i in Color}[color.upper()]
class Config:
_INSTANCE = 0
def __init__(self, level: int) -> None:
Config._INSTANCE += 1
self._INSTANCE = Config._INSTANCE
self._settings = {
'success': 2,
'info': 3,
'custom': 2,
'warning': 1,
'error': 1,
'debug': 2,
}
self.level = level
self._iter = 0
def __str__(self) -> str:
return f"<{self.__class__.__name__}({self._settings})>"
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(settings={self._settings},\
level={self._level} instance={self._INSTANCE})>"
def __bool__(self) -> bool:
return bool(self._settings)
def __getitem__(self, k: str) -> int:
return self._settings[k]
def __int__(self) -> int:
return self.level
def __len__(self) -> int:
return len(self._settings)
def __contains__(self, __o: Any) -> bool:
return __o in self._settings
def __iter__(self) -> Config:
return self
def __next__(self) -> str:
keys = self.keys()
if self._iter >= len(self):
self._iter = 0
raise StopIteration
retval = keys[self._iter]
self._iter += 1
return retval
@property
def settings(self) -> Dict[str, int]:
return self._settings
@property
def level(self) -> int:
return self._level
@level.setter
def level(self, v: int) -> None:
"""Level setter validator
:param v: The level to change to
:type v: int
:raises ValueError: If the v is invalid for `level`
"""
if not 0 < v <= 5:
raise ValueError(f"Level must be between `0-5` not `{v}`")
self._level = v
def items(self): # type: ignore
yield self._settings.items()
def keys(self) -> List[str]:
return list(self._settings.keys())
def values(self) -> List[int]:
return list(self._settings.values())
def update(self, **settings: int) -> None:
"""Change the settings configuration for a given instance
:raises ValueError: If the configuraion does not exist or\
user tries to update to an invalid value
"""
if all(key in self.settings for key in settings) and\
all(0 < settings[key] <= 5 for key in settings):
self._settings.update(settings)
else:
raise ValueError(f"Invalid key or value in {settings}. Remember,\
key has to exists in {self.settings} and all values have to be between (1-5)")
def get(self, key: str) -> int:
"""Get a setting configuration
:param key: Key of the configuration
:type key: str
:return: The level this configuration is set to
:rtype: int
"""
return self._settings[key]
class UnhandledLogError(Exception): ...
class MetaLogger(type):
"""A simple metaclass that checks if all logs/settings are correctly
handled by comaring the ammount of settings in Config._settings agnainst
the functions that live in Logger() - some unnecessary
"""
def __new__(self, name: str, bases:
Tuple[type], attrs: Dict[str, Any]) -> type:
error_msg = "We either have a log function that is not in settings OR\
a function that needs to be dissmissed OR a setting that is\
not added as a log function"
log_functions = 0
target_log_functions = len(Config(1))
dismiss_attrs = ('settings', 'get_line_info')
for log in attrs:
if not log.startswith('_') and log not in dismiss_attrs:
log_functions += 1
if not log_functions == target_log_functions:
raise UnhandledLogError(error_msg)
return type(name, bases, attrs)
class Logger(metaclass=MetaLogger):
"""The Logger class handles debbuging with colored information
based on the level. Each instance has its own settings which the
user can change independently through `self.settings.update()`.
Mind that level 1 will print evetything and level 5 less
"""
def __init__(self, level: int = 2, log_path: Optional[str] = None):
"""Initializer of Logger object
:param level: The level of debugging. Based on that, some informations\
can be configured to not show up thus lowering the verbosity, defaults to 2
:type level: int, optional
"""
self._settings = Config(level)
self._log_path = log_path
def __str__(self) -> str:
return f"<{self.__class__.__name__}Object-{self._settings._INSTANCE}>"
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(instance={self._settings._INSTANCE})>"
@property
def settings(self) -> Config:
"""Getter so self._settings cannot be writable
:return: Config instance
:rtype: Config
"""
return self._settings
def _log(self, header: str, msg: str) -> None:
"""If a file log_path is provided in `Logger.__init__`
:param header: The msg header WARNING/ERROR ...
:type header: str
:param msg: The message to be logged
:type msg: str
"""
if self._log_path is not None:
if not os.path.exists(self._log_path):
with open(self._log_path, mode='w') as _: ...
with open(self._log_path, mode='a') as f:
f.write(f"[{header.upper()}]: {msg}\n")
def _runner(self, func_name: str) -> bool:
"""Use to check the `self.level` before printing the log
:param func_name: Name of the function as a string
:type func_name: str
:return: Wheather the settings allow this certain function to print
:rtype: bool
"""
return self.settings.level <= self.settings[func_name]
def get_line_info(self, file: str, prompt: str) -> str:
"""Get the file and the line of where this function is called
:param file: The file where the func is called (Recommended: `__file__`)
:type file: str
:param prompt: Any message to follow after line info
:type prompt: str
:return: *file path*, *line number* *prompt*
:rtype: str
"""
cf = currentframe()
msg = f"File \"{file}\", line {cf.f_back.f_lineno}" # type: ignore
self.debug(f"{msg} : {prompt}")
print(file)
return msg
# ONLY LOGGING FUNCTIONS AFTER THIS
def custom(self, msg: str, header: str = 'custom',
*args: Any, color: str = get_color('reset'), **kwargs: Any) -> None:
func_name = sys._getframe().f_code.co_name
if self._runner(func_name):
print(f"{color}[{header.upper()}]: {msg}{get_color('reset')}", *args, end='', **kwargs)
print(get_color('reset'))
self._log(header, msg)
def info(self, msg: str, *args: Any, **kwargs: Any) -> None:
func_name = sys._getframe().f_code.co_name
if self._runner(func_name):
print(f"{Color.YELLOW.value}[{func_name.upper()}]: {msg}",
*args, end='', **kwargs)
print(get_color('reset'))
self._log(func_name, msg)
def success(self, msg: str, *args: Any, **kwargs: Any) -> None:
func_name = sys._getframe().f_code.co_name
if self._runner(func_name):
print(f"{Color.GREEN.value}[{func_name.upper()}]: {msg}",
*args, end='', **kwargs)
print(get_color('reset'))
self._log(func_name, msg)
def warning(self, msg: str, *args: Any, **kwargs: Any) -> None:
func_name = sys._getframe().f_code.co_name
if self._runner(func_name):
print(f"{Color.RED.value}[{func_name.upper()}]: {msg}",
*args, end='', **kwargs)
print(get_color('reset'))
self._log(func_name, msg)
def error(self, msg: str, *args: Any, **kwargs: Any) -> None:
func_name = sys._getframe().f_code.co_name
if self._runner(func_name):
print(f"{Color.RED_BOLD.value}[{func_name.upper()}]: {msg}",
*args, end='', **kwargs)
print(get_color('reset'))
self._log(func_name, msg)
def debug(self, msg: str, *args: Any, **kwargs: Any) -> None:
func_name = sys._getframe().f_code.co_name
if self._runner(func_name):
print(f"{Color.BLUE.value}[{func_name.upper()}]: {msg}",
*args, end='', **kwargs)
print(get_color('reset'))
self._log(func_name, msg)

123
Bot/logger/tests.py Normal file
View File

@@ -0,0 +1,123 @@
import os
import io
import sys
import unittest
import unittest.mock
from pathlib import Path
from logger import Logger
from .logger import Config
from typing import Any
BASE_DIR = Path(__file__).parent
class TestLogger(unittest.TestCase):
def setUp(self) -> None:
self.logger = Logger(1)
def tearDown(self) -> None:
self.logger = Logger(1)
def test_invalid_instance(self) -> None:
with self.assertRaises(ValueError, msg='An instance with level=0 is created'):
Logger(0)
with self.assertRaises(ValueError, msg='An instance with level=6 is created'):
Logger(6)
logger = Logger(2)
with self.assertRaises(ValueError, msg='A settings instance changed level above limit after creation'): # noqa
logger.settings.level = 6
logger = Logger(2)
with self.assertRaises(ValueError, msg='A settings instance changed level above limit after creation'): # noqa
logger.settings.level = 0
def test_instance_counter(self) -> None:
Config._INSTANCE = 0
l1 = Logger()
self.assertEqual(l1.settings._INSTANCE, 1, msg="First instance counter is wrong")
l2 = Logger()
self.assertEqual(l2.settings._INSTANCE, 2, msg="Second instance counter is wrong")
self.assertEqual(l1.settings._INSTANCE, 1, msg="Repeated instance counter is wrong")
def test_update_settings(self) -> None:
self.logger.settings.update(success=4)
self.assertEqual(self.logger.settings['success'], 4)
with self.assertRaises(TypeError, msg="self.logger object is directly assignable"):
self.logger.settings['success'] = 4 # type: ignore
with self.assertRaises(ValueError, msg="not existing key passed into self.settings"):
self.logger.settings.update(not_exists=4)
with self.assertRaises(TypeError, msg="String passed as value in self.settings"):
self.logger.settings.update(success='4') # type: ignore
with self.assertRaises(ValueError, msg='0 Passed as valid key in self.settings'):
self.logger.settings.update(success=0)
with self.assertRaises(ValueError, msg="Above the allowed limit passed as valid key in self.settings"): # noqa
self.logger.settings.update(success=6)
with self.assertRaises(AttributeError, msg="self.settings is overwritable (`=`)"):
self.logger.settings.settings = 'fsaf' # type: ignore
self.logger.settings.update(success=2, info=1)
self.assertEqual(self.logger.settings['success'], 2, msg='Error in updating 2 values at once at `success`') # noqa
self.assertEqual(self.logger.settings['info'], 1, msg='Error in updating 2 values at once at `info`') # noqa
def test_get_settings(self) -> None:
with self.assertRaises(AttributeError, msg="self.settings is overwritable (`=`) instead of read-only"): # noqa
self.logger.settings = 'fsaf' # type: ignore
def test_get_setting(self) -> None:
self.assertEqual(self.logger.settings.get('info'), 3, msg="settings.get() returns value from wrong key or the value has changed") # noqa
with self.assertRaises(KeyError):
self.logger.settings.get('not_existing')
@unittest.mock.patch('sys.stdout', new_callable=io.StringIO)
def test_success_msg(self, mock_stdout: Any) -> None:
msg = "anythong"
self.logger.settings.level = 5
self.logger.success(msg)
self.assertFalse(mock_stdout.getvalue())
self.logger.info(msg)
self.assertTrue(mock_stdout.getvalue())
self.logger.settings.level = 1
self.logger.success(msg)
self.assertTrue(mock_stdout.getvalue())
self.logger.info(msg)
self.assertTrue(mock_stdout.getvalue())
def test_iter_next(self) -> None:
for i2 in self.logger.settings: ...
for i1 in self.logger.settings: ...
self.assertEqual(i1, i2, msg="There is something wrong with Config.__iter__ and Config.__next__. Maybe the index (self._iter) is not refreshing correctly") # noqa
def test_log_to_file(self) -> None:
msg = 'This should be written in the file'
file = Path(f"{BASE_DIR}tests.txt")
logger = Logger(1, str(file))
# Redirect the annoying log to '/dev/null'
# (or according file for other platforms) and bring it back
std_out = sys.stdout
f = open(os.devnull, mode='w')
sys.stdout = f
logger.info(msg)
f.close()
sys.stdout = std_out
self.assertTrue(os.path.exists(file))
with open(file, mode='r') as f:
self.assertEqual(f"[INFO]: {msg}", f.read()[:-1]) # Slice to remove '\n' from the file
os.remove(file)

250
Bot/main.py Normal file
View File

@@ -0,0 +1,250 @@
# mypy: disable-error-code=attr-defined
import os
import sys
import praw # type: ignore
import time
import utils
import prawcore # type: ignore
import traceback
import datetime as dt
from pathlib import Path
from logger import Logger
from jsonwrapper import AutoSaveDict
from typing import (
Optional,
Callable,
Tuple,
List,
Set,
Any,
)
from bot import (
Datatype,
Posts,
Row,
)
def config_app(path: Path) -> AutoSaveDict:
config = {
'client_id': '',
'client_secret': '',
'user_agent': '',
'username': '',
'password': '',
'sub_name': '',
'max_days': '',
'max_posts': '',
'sleep_minutes': '',
}
configuration: List[List[str]] = []
if not os.path.exists(path):
for key, _ in config.items():
config_name = ' '.join(key.split('_')).title()
user_inp = input(f"{config_name}: ")
configuration.append([key, user_inp])
for config_name, value in configuration:
config[config_name] = value
config_handler = AutoSaveDict(
path,
**config
)
return config_handler
config_dir = Path(utils.BASE_DIR, 'config')
config_dir.mkdir(parents=True, exist_ok=True)
config_file = Path(config_dir, 'config.json')
handler = config_app(config_file)
handler.init()
posts = Posts('deleted_posts', config_dir)
logger = Logger(1)
untracked_flairs = (utils.Flair.SOLVED, utils.Flair.ABANDONED)
posts.init()
reddit = praw.Reddit(
client_id=handler['client_id'],
client_secret=handler['client_secret'],
user_agent=handler['user_agent'],
username=handler['username'],
password=handler['password'],
)
def remove_method(submission: praw.reddit.Submission) -> Optional[str]:
removed = submission.removed_by_category
if removed is not None:
# if removed in ('author', 'moderator'):
# method = 'Removed by moderator'
if removed in ('author',):
method = 'Deleted by OP'
elif removed in ('moderator',):
method = 'Removed by mod'
elif removed in ('deleted',):
method = 'Deleted by user'
else:
method = 'Uknown deletion method'
return method
return None
def send_modmail(reddit: praw.Reddit, subreddit: str, subject: str, msg: str) -> None:
# build the payload for the compose API
# Note: The caller provides subject/msg; subreddit is used for the `to` field.
data = {
"subject": subject,
"text": msg,
"to": f"/r/{subreddit}",
}
try:
print("Sending modmail via api/compose/")
reddit.post("api/compose/", data=data)
except Exception:
# fallback/report if necessary
print("Failed to send modmail with new method")
raise
def notify_if_error(func: Callable[..., int]) -> Callable[..., int]:
def wrapper(*args: Any, **kwargs: Any) -> int:
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
logger.debug("\nProgram interrupted by user")
return 0
except:
author = 'https://www.reddit.com/user/kaerfkeerg'
full_error = traceback.format_exc()
bot_name = utils.BOT_NAME
msg = f"Error with '{bot_name}':\n\n{full_error}\n\nPlease report to author ({author})"
send_modmail(
reddit,
handler['sub_name'],
f'An error has occured with {utils.BOT_NAME} msg',
msg
)
return 1
return wrapper
def should_be_tracked(
flair: utils.Flair,
untracked_flairs: Tuple[utils.Flair, ...]) -> bool:
return flair not in untracked_flairs
def user_is_deleted(submission: praw.reddit.Submission) -> bool:
return submission.author is None
def check_submission(submission: praw.reddit.Submission, saved_submission_ids: Set[Row]) -> None:
if not user_is_deleted(submission) and submission.id not in saved_submission_ids:
flair = utils.get_flair(submission.link_flair_text)
method = remove_method(submission)
if should_be_tracked(flair, untracked_flairs):
if method is None and submission.author is not None:
original_post = Row(
username=submission.author.name,
title=submission.title,
text=submission.selftext,
post_id=submission.id,
deletion_method=Datatype.NULL,
post_last_edit=Datatype.NULL,
record_created=str(dt.datetime.now()),
record_edited=str(dt.datetime.now()),
)
posts.save(original_post)
@notify_if_error
def main() -> int:
# run indefinitely, sleeping between iterations
while True:
posts_to_delete: Set[Row] = set()
ignore_methods = ['Removed by mod',]
if utils.parse_cmd_line_args(sys.argv, logger, config_file, posts):
return 0
saved_submission_ids = {row.post_id for row in posts.fetch_all()}
max_posts = handler['max_posts']
limit = int(max_posts) if max_posts else None
sub_name = handler['sub_name']
for submission in reddit.subreddit(sub_name).new(limit=limit):
try:
check_submission(submission, saved_submission_ids)
except prawcore.exceptions.TooManyRequests:
time.sleep(60)
check_submission(submission, saved_submission_ids)
for stored_post in posts.fetch_all():
try:
submission = reddit.submission(id=stored_post.post_id)
max_days = int(handler['max_days'])
created = utils.string_to_dt(stored_post.record_created).date()
flair = utils.get_flair(submission.link_flair_text)
if utils.submission_is_older(created, max_days) or flair in untracked_flairs:
posts_to_delete.add(stored_post)
continue
submission = reddit.submission(id=stored_post.post_id)
method = remove_method(submission)
if user_is_deleted(submission):
if method not in ignore_methods:
send_modmail(
reddit,
handler['sub_name'],
"User's account has been deleted",
utils.modmail_removal_notification(stored_post, 'Account has been deleted')
)
posts_to_delete.add(stored_post)
elif method is not None and not stored_post.deletion_method:
if method not in ignore_methods:
stored_post.deletion_method = method
stored_post.record_edited = str(dt.datetime.now())
posts.edit(stored_post)
msg = utils.modmail_removal_notification(stored_post, method)
send_modmail(
reddit,
handler['sub_name'],
'A post has been deleted',
msg
)
posts_to_delete.add(stored_post)
time.sleep(utils.MSG_AWAIT_THRESHOLD)
if submission.selftext != stored_post.text\
or submission.selftext != stored_post.post_last_edit\
and not stored_post.deletion_method:
stored_post.post_last_edit = submission.selftext
stored_post.record_edited = str(dt.datetime.now())
posts.edit(stored_post)
except prawcore.exceptions.TooManyRequests:
time.sleep(60)
for row in posts_to_delete:
posts.delete(post_id=row.post_id)
posts_to_delete.clear()
logger.info("Program finished successfully")
logger.info(f"Total posts deleted: {len(posts_to_delete)}")
# wait before the next cycle
sleep_minutes = int(handler['sleep_minutes']) if handler['sleep_minutes'] else 5
time.sleep(sleep_minutes * 60)
# end of while True
return 0
if __name__ == '__main__':
sys.exit(
main()
)

228
Bot/main.py.bkup Normal file
View File

@@ -0,0 +1,228 @@
# mypy: disable-error-code=attr-defined
import os
import sys
import praw # type: ignore
import time
import utils
import prawcore # type: ignore
import traceback
import datetime as dt
from pathlib import Path
from logger import Logger
from jsonwrapper import AutoSaveDict
from typing import (
Optional,
Callable,
Tuple,
List,
Set,
Any,
)
from bot import (
Datatype,
Posts,
Row,
)
def config_app(path: Path) -> AutoSaveDict:
config = {
'client_id': '',
'client_secret': '',
'user_agent': '',
'username': '',
'password': '',
'sub_name': '',
'max_days': '',
'max_posts': '',
}
configuration: List[List[str]] = []
if not os.path.exists(path):
for key, _ in config.items():
config_name = ' '.join(key.split('_')).title()
user_inp = input(f"{config_name}: ")
configuration.append([key, user_inp])
for config_name, value in configuration:
config[config_name] = value
config_handler = AutoSaveDict(
path,
**config
)
return config_handler
config_file = Path(utils.BASE_DIR, 'config.json')
handler = config_app(config_file)
handler.init()
posts = Posts('post', utils.BASE_DIR)
logger = Logger(1)
untracked_flairs = (utils.Flair.SOLVED, utils.Flair.ABANDONED)
posts.init()
reddit = praw.Reddit(
client_id=handler['client_id'],
client_secret=handler['client_secret'],
user_agent=handler['user_agent'],
username=handler['username'],
password=handler['password'],
)
def remove_method(submission: praw.reddit.Submission) -> Optional[str]:
removed = submission.removed_by_category
if removed is not None:
# if removed in ('author', 'moderator'):
# method = 'Removed by moderator'
if removed in ('author',):
method = 'Deleted by OP'
elif removed in ('moderator',):
method = 'Removed by mod'
elif removed in ('deleted',):
method = 'Deleted by user'
else:
method = 'Uknown deletion method'
return method
return None
def send_modmail(reddit: praw.Reddit, subreddit: str, subject: str, msg: str) -> None:
print("Sending modmail...")
reddit.subreddit(subreddit).message(subject, msg)
print(msg)
def notify_if_error(func: Callable[..., int]) -> Callable[..., int]:
def wrapper(*args: Any, **kwargs: Any) -> int:
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
logger.debug("\nProgram interrupted by user")
return 0
except:
author = 'https://www.reddit.com/user/kaerfkeerg'
full_error = traceback.format_exc()
bot_name = utils.BOT_NAME
msg = f"Error with '{bot_name}':\n\n{full_error}\n\nPlease report to author ({author})"
send_modmail(
reddit,
handler['sub_name'],
f'An error has occured with {utils.BOT_NAME} msg',
msg
)
return 1
return wrapper
def should_be_tracked(
flair: utils.Flair,
untracked_flairs: Tuple[utils.Flair, ...]) -> bool:
return flair not in untracked_flairs
def user_is_deleted(submission: praw.reddit.Submission) -> bool:
return submission.author is None
def check_submission(submission: praw.reddit.Submission, saved_submission_ids: Set[Row]) -> None:
if not user_is_deleted(submission) and submission.id not in saved_submission_ids:
flair = utils.get_flair(submission.link_flair_text)
method = remove_method(submission)
if should_be_tracked(flair, untracked_flairs):
if method is None and submission.author is not None:
original_post = Row(
username=submission.author.name,
title=submission.title,
text=submission.selftext,
post_id=submission.id,
deletion_method=Datatype.NULL,
post_last_edit=Datatype.NULL,
record_created=str(dt.datetime.now()),
record_edited=str(dt.datetime.now()),
)
posts.save(original_post)
@notify_if_error
def main() -> int:
posts_to_delete: Set[Row] = set()
ignore_methods = ['Removed by mod',]
if utils.parse_cmd_line_args(sys.argv, logger, config_file, posts):
return 0
saved_submission_ids = {row.post_id for row in posts.fetch_all()}
max_posts = handler['max_posts']
limit = int(max_posts) if max_posts else None
sub_name = handler['sub_name']
for submission in reddit.subreddit(sub_name).new(limit=limit):
try:
check_submission(submission, saved_submission_ids)
except prawcore.exceptions.TooManyRequests:
time.sleep(60)
check_submission(submission, saved_submission_ids)
for stored_post in posts.fetch_all():
try:
submission = reddit.submission(id=stored_post.post_id)
max_days = int(handler['max_days'])
created = utils.string_to_dt(stored_post.record_created).date()
flair = utils.get_flair(submission.link_flair_text)
if utils.submission_is_older(created, max_days) or flair in untracked_flairs:
posts_to_delete.add(stored_post)
continue
submission = reddit.submission(id=stored_post.post_id)
method = remove_method(submission)
if user_is_deleted(submission):
if method not in ignore_methods:
send_modmail(
reddit,
handler['sub_name'],
"User's account has been deleted",
utils.modmail_removal_notification(stored_post, 'Account has been deleted')
)
posts_to_delete.add(stored_post)
elif method is not None and not stored_post.deletion_method:
if method not in ignore_methods:
stored_post.deletion_method = method
stored_post.record_edited = str(dt.datetime.now())
posts.edit(stored_post)
msg = utils.modmail_removal_notification(stored_post, method)
send_modmail(
reddit,
handler['sub_name'],
'A post has been deleted',
msg
)
posts_to_delete.add(stored_post)
time.sleep(utils.MSG_AWAIT_THRESHOLD)
if submission.selftext != stored_post.text\
or submission.selftext != stored_post.post_last_edit\
and not stored_post.deletion_method:
stored_post.post_last_edit = submission.selftext
stored_post.record_edited = str(dt.datetime.now())
posts.edit(stored_post)
except prawcore.exceptions.TooManyRequests:
time.sleep(60)
for row in posts_to_delete:
posts.delete(post_id=row.post_id)
posts_to_delete.clear()
logger.info("Program finished successfully")
logger.info(f"Total posts deleted: {len(posts_to_delete)}")
return 0
if __name__ == '__main__':
sys.exit(
main()
)

View File

@@ -0,0 +1 @@
from .model import * # noqa

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

309
Bot/sqlitewrapper/model.py Normal file
View File

@@ -0,0 +1,309 @@
# mypy: disable-error-code=attr-defined
from __future__ import annotations
from pathlib import Path
from sqlite3 import (
Connection,
connect,
)
from typing import (
Generator,
Optional,
Tuple,
List,
Dict,
Any,
)
__all__ = (
'Row',
'Model',
'Datatype',
)
class Row:
def __init__(self, **attrs: Any) -> None:
for name, value in attrs.items():
self.__dict__[name] = value
def values(self) -> Tuple[Any, ...]:
return tuple(self.__dict__.values())
def keys(self) -> Tuple[Any, ...]:
return tuple(self.__dict__.keys())
def dict(self) -> Dict[Any, Any]:
return self.__dict__
def items(self) -> Any:
return self.__dict__.items()
def __str__(self) -> str:
return f"<Row{self.__dict__}>"
def __repr__(self) -> str:
return str(self)
def __iter__(self) -> Row:
self.__n = 0
return self
def __next__(self) -> Any:
keys = tuple(self.__dict__.keys())[:-1] # Remove the `self.__n`
if self.__n == len(keys):
raise StopIteration
data = keys[self.__n]
self.__n += 1
return data
def __getitem__(self, __k: Any) -> Any:
return self.__dict__[__k]
class Datatype:
ID = 'INTEGER PRIMARY KEY'
NULL = None
INT = 'INTEGER'
REAL = 'REAL'
STR = 'TEXT'
BLOB = 'BLOB'
class ConnectionManager:
def __init__(self, db: str) -> None:
self.db = db
self._connection = connect(self.db)
def __enter__(self) -> Connection:
return self._connection
def __exit__(self, *args: Any) -> None:
self._connection.commit()
self._connection.close()
class Model:
def __init__(self, db_name: str, save_path: Path, **table: Any) -> None:
self.name = db_name
self.path = str(Path(f"{save_path}/.{db_name}.sqlite"))
self.table = table
self.table['id'] = Datatype.ID
self.table_values = ' '.join(
f"{name} {datatype}," for (name, datatype) in table.items()
)[:-1]
def __str__(self) -> str:
data = list(self.fetch_all())
return f"{self.__class__.__name__}{data}"
def __repr__(self) -> str:
return str(self)
def __hash__(self) -> int:
return hash(self.path)
def _get_conditions(self, **where: Any) -> str:
keys = tuple(where.keys())
condition = ""
for index, key in enumerate(keys):
condition += f"{key} = ?"
if index != len(keys) - 1:
condition += " AND "
return condition
def execute(self, query: str, values: Optional[Tuple[Row, ...]] = None) -> Any:
"""Execute a query
:param query: An SQL Query
:type query: str
:param values: vales to be added, if any, defaults to None
:type values: Optional[Tuple[Row, ...]], optional
:raises Exception: If tha database has not been initialized
before trying to execute any queries
:return: Whatever the query would return
:rtype: Any
"""
with ConnectionManager(self.path) as cur:
if values is None:
data = cur.execute(query)
else:
data = cur.execute(query, values)
return data.fetchall()
def init(self) -> None:
"""Create a table based on the `self.table` (**table) kwargs
provided upon initialization
"""
query = f"""
CREATE TABLE IF NOT EXISTS {self.name} (
{self.table_values}
)
"""
self.execute(query)
def save(self, row: Row) -> None:
"""Save a row into the db. Example:
```
>>> row = Row(name='Pantelis', age=13)
>>> self.save(row)
```
:param row: A row object
:type row: Row
:raises ValueError: If the Row values does not match the db schema
"""
fields = self.table
# - 1 for the id field
if len(fields) - 1 != len(row.keys()):
raise ValueError(f"Row fields {row.keys()} do not much db schema\
{tuple(self.table.keys())[:-1]}. Consider adding 'Datatype.NULL' for the missing fields")
marks = []
for _ in row.values():
marks.append('?')
query = f"""
INSERT INTO {self.name} {row.keys()}
VALUES (
{", ".join(marks)}
)
"""
self.execute(query, row.values())
def delete(self, **where: Any) -> None:
"""Delete a row from the db. Example:
```
>>> # Query: `DELETE FROM {self.name} WHERE name = ? AND age = ?`
>>> # This will delete every row with name='John' and age=15
>>> self.delete(name='John')
```
"""
values = tuple(where.values())
condition = self._get_conditions(**where)
query = f"""
DELETE FROM {self.name}
WHERE
{condition}
"""
self.execute(query, values)
def edit(self, row: Row) -> None:
"""After you picked and changed a row, use this instead of `save` in order
for the entry to preserver the same `id`. Example:
```
>>> row = self.get(name='john')
>>> row.name = 'Mary'
>>> self.edit(row)
```
:param row: _description_
:type row: Row
"""
id = row.id
self.delete(id=id)
marks = []
for _ in row.values():
marks.append('?')
query = f"""
INSERT INTO {self.name} {row.keys()}
VALUES (
{", ".join(marks)}
)
"""
self.execute(query, row.values())
def _entries_as_rows(self, data: List[Any]) -> List[Row]:
"""Take a list of entries and convert it to a list of `Row`s
:param data: The list of entries
:type data: List[Any]
:return: A copy of the data as list or `Row`s
:rtype: List[Row]
"""
# rows = [
# <Row{'name': 'Pantelis', 'age': 12, 'id': 1}>,
# <Row{'name': 'Pantelis', 'age': 12, 'id': 2}>,
# ]
table_keys = tuple(self.table.keys())
rows = []
for row in data:
struct = {}
for index, col in enumerate(row):
struct[table_keys[index]] = col
rows.append(Row(**struct))
struct.clear()
return rows
def fetch_all(self) -> Generator[Row, None, None]:
query = f"SELECT * FROM {self.name}"
data = self.execute(query)
rows = self._entries_as_rows(data)
yield from rows
def filter(self, **where: Any) -> Generator[Row, None, None]:
"""Filter out data from the db based on the `where` conditions. Example
```
>>> data = self.filter(name='Pantelis', age=13)
>>> # Query created
>>> # SELECT * FROM test WHERE name = Pantelis AND age = 13
>>> for i in data:
... i
<Row{...}>
```
:yield: Row
:rtype: Generator[Row, None, None]
"""
# cursor.execute("SELECT * FROM my_table WHERE name = ? AND age = ?", (name, age))
values = tuple(where.values())
condition = self._get_conditions(**where)
query = f"""
SELECT * FROM {self.name}
WHERE
{condition}
"""
data = self.execute(query, values)
rows = self._entries_as_rows(data)
yield from rows
def get(self, **where: Any) -> Row:
"""Find the first occurance matching the `where` condition(s) Example:
```
>>> self.get(name="Pantelis", age=12)
<Row{...}>
```
:return: A `Row` with the values of the matching row
:rtype: Row
"""
values = tuple(where.values())
condition = self._get_conditions(**where)
query = f"""
SELECT * FROM {self.name}
WHERE
{condition}
"""
data = self.execute(query, values)[0]
row = {}
for value, name in zip(data, tuple(self.table.keys())):
row[name] = value
return Row(**row)
if __name__ == '__main__':
pass

View File

@@ -0,0 +1,83 @@
# mypy: disable-error-code=attr-defined
import unittest
import os
from pathlib import Path
from .model import (
Row,
Model,
Datatype
)
class TestRow(unittest.TestCase):
def setUp(self) -> None:
return super().setUp()
def tearDown(self) -> None:
return super().tearDown()
def test_init(self) -> None:
name, age = 'Mary', 14
row = Row(name=name, age=age)
result = row.values()
self.assertEqual(result, (name, age))
class TestModel(unittest.TestCase):
def setUp(self) -> None:
self.base_dir = Path(__file__).parent
self.name = 'testdb'
self.db = Model(
self.name,
self.base_dir,
name=Datatype.STR,
age=Datatype.INT,
)
self.db.init()
return super().setUp()
def tearDown(self) -> None:
os.remove(self.db.path)
return super().tearDown()
def test_init(self) -> None:
self.assertTrue(os.path.exists(self.db.path))
def test_save(self) -> None:
name, age = 'John', 14
self.db.save(Row(name=name, age=age))
self.db.get(name=name, age=age) # This must not raise an Exception
with self.assertRaises(ValueError):
self.db.save(Row(name='test'))
def test_delete(self) -> None:
name, age = 'John', 14
self.db.save(Row(name=name, age=age))
self.db.delete(name=name, age=age)
self.assertEqual(len(tuple(self.db.fetch_all())), 0)
def test_edit(self) -> None:
name, age = 'John', 14
self.db.save(Row(name=name, age=age))
r = self.db.get(name=name, age=age)
r.name = 'Mary'
self.db.edit(r)
self.assertEqual(len(tuple(self.db.fetch_all())), 1)
self.db.get(name='Mary', age=age) # This should not raise an exception
def test_filter(self) -> None:
data = (
('John', 14),
('Mary', 14),
('Mary', 15),
)
for i in data:
self.db.save(Row(name=i[0], age=i[1]))
age = 14
filtered = self.db.filter(age=age)
data = list(filtered) # type: ignore
self.assertTrue(all(i.age == age for i in data)) # type: ignore
self.assertEqual(len(data), 2)

2
Bot/utils/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from .constants import * # noqa
from .actions import * # noqa

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

99
Bot/utils/actions.py Normal file
View File

@@ -0,0 +1,99 @@
# mypy: disable-error-code=attr-defined
import os
import datetime as dt
from bot import Posts
from pathlib import Path
from enum import Enum
from typing import List
from logger import Logger
from sqlitewrapper import Row
__all__ = (
'Flair',
'get_flair',
'modmail_removal_notification',
'parse_cmd_line_args',
'submission_is_older',
'string_to_dt',
)
class Flair(Enum):
SOLVED = 'Solved'
ABANDONED = 'Abandoned'
UKNOWN = 'Uknown'
def get_flair(flair: str) -> Flair:
try:
return Flair(flair)
except ValueError:
return Flair('Uknown')
def modmail_removal_notification(submission: Row, method: str) -> str:
return f"""A post has been removed
OP: `{submission.username}`
Title: {submission.title}
Post ID: https://old.reddit.com/comments/{submission.post_id}
Date created: {submission.record_created}
Date found: {submission.record_edited}
Ban Template;
[Deleted post](https://reddit.com/comments/{submission.post_id}).
Deleting an answered post, without marking it solved, is against our rules.
You can read [our rules](https://reddit.com/r/MinecraftHelp/wiki/rules) to see if you're eligible to appeal this ban."""
def parse_cmd_line_args(args: List[str], logger: Logger, config_file: Path, posts: Posts) -> bool:
help_msg = """Command line help prompt
Command: help
Args: []
Decription: Prints the help prompt
Command: reset_config
Args: []
Decription: Reset the bot credentials
Command: reset_db
Args: []
Decription: Reset the database
"""
if len(args) > 1:
if args[1] == 'help':
logger.info(help_msg)
elif args[1] == 'reset_config':
try:
os.remove(config_file)
except FileNotFoundError:
logger.error("No configuration file found")
elif args[1] == 'reset_db':
try:
os.remove(posts.path)
except FileNotFoundError:
logger.error("No database found")
else:
logger.info(help_msg)
return True
return False
def submission_is_older(submission_date: dt.date, max_days: int) -> bool:
current_date = dt.datetime.now().date()
time_difference = current_date - submission_date
if time_difference.days > max_days:
return True
return False
def string_to_dt(date_string: str) -> dt.datetime:
return dt.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f')

13
Bot/utils/constants.py Normal file
View File

@@ -0,0 +1,13 @@
from pathlib import Path
__all__ = (
'BASE_DIR',
'BOT_NAME',
'MSG_AWAIT_THRESHOLD',
)
BOT_NAME = 'CraftSleuthBot'
BASE_DIR = Path(__file__).parent.parent.parent
MSG_AWAIT_THRESHOLD = 5

48
Bot/utils/tests.py Normal file
View File

@@ -0,0 +1,48 @@
import unittest
import datetime as dt
from .actions import (
Flair,
get_flair,
string_to_dt,
submission_is_older,
)
class TestActions(unittest.TestCase):
def setUp(self) -> None:
return super().setUp()
def tearDown(self) -> None:
return super().tearDown()
def test_get_flair(self) -> None:
solved = get_flair("Solved")
self.assertEqual(solved, Flair.SOLVED)
abandoned = get_flair('Abandoned')
self.assertEqual(abandoned, Flair.ABANDONED)
uknown = get_flair('Uknown')
self.assertEqual(uknown, Flair.UKNOWN)
uknown = get_flair('fsdafsd')
self.assertEqual(uknown, Flair.UKNOWN)
def test_string_to_dt(self) -> None:
datetime = dt.datetime.now()
string_dt = str(datetime)
back_to_dt = string_to_dt(string_dt)
self.assertEqual(datetime, back_to_dt)
def test_submission_is_older(self) -> None:
max_days = 7
today = dt.datetime.now()
post_made = today - dt.timedelta(days=3)
result = submission_is_older(post_made.date(), max_days)
self.assertFalse(result)
post_made = today - dt.timedelta(days=max_days)
result = submission_is_older(post_made.date(), max_days)
self.assertFalse(result)
post_made = today - dt.timedelta(days=(max_days + 1))
result = submission_is_older(post_made.date(), max_days)
self.assertTrue(result)