commit b8182ead888287cb51e30119e34d95d0897afe68 Author: Slfhstd Date: Mon Feb 23 22:48:25 2026 +0000 Initial commit diff --git a/Bot/__main__.py b/Bot/__main__.py new file mode 100644 index 0000000..c90c051 --- /dev/null +++ b/Bot/__main__.py @@ -0,0 +1,28 @@ +__author__ = 'hor00s' +__email__ = 'hor00s199@gmail.com' +__version__ = '1.0.0 beta' +__description__ = 'This bot will go through a sub and\ + alert mods through modmail if a post has been deleted' +__license__ = 'MIT' +__dependencies__ = ('praw',) + +__disclaimer__ = """Disclaimer: +This Python bot is a personal hobby project created for fun and learning purposes. +It is not intended for any commercial use or critical tasks. +While efforts have been made to ensure its functionality, there may be bugs and/or errors. +The bot is provided as-is, without any warranty or guarantee of its performance. +Use it at your own risk. + +The author and maintainers of this bot are not responsible for any damages, data loss, +or any adverse consequences that may arise from its use. +We do not collect or store any personal data or information from users.""" + + +import sys +from main import main + + +if __name__ == '__main__': + sys.exit( + main() + ) diff --git a/Bot/__pycache__/__main__.cpython-39.pyc b/Bot/__pycache__/__main__.cpython-39.pyc new file mode 100644 index 0000000..566b51c Binary files /dev/null and b/Bot/__pycache__/__main__.cpython-39.pyc differ diff --git a/Bot/__pycache__/main.cpython-39.pyc b/Bot/__pycache__/main.cpython-39.pyc new file mode 100644 index 0000000..0e6e1b8 Binary files /dev/null and b/Bot/__pycache__/main.cpython-39.pyc differ diff --git a/Bot/bot/__init__.py b/Bot/bot/__init__.py new file mode 100644 index 0000000..970613c --- /dev/null +++ b/Bot/bot/__init__.py @@ -0,0 +1 @@ +from .post import * # noqa diff --git a/Bot/bot/__pycache__/__init__.cpython-311.pyc b/Bot/bot/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..c6c69a1 Binary files /dev/null and b/Bot/bot/__pycache__/__init__.cpython-311.pyc differ diff --git a/Bot/bot/__pycache__/__init__.cpython-39.pyc b/Bot/bot/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..987b1d6 Binary files /dev/null and b/Bot/bot/__pycache__/__init__.cpython-39.pyc differ diff --git a/Bot/bot/__pycache__/post.cpython-311.pyc b/Bot/bot/__pycache__/post.cpython-311.pyc new file mode 100644 index 0000000..b414883 Binary files /dev/null and b/Bot/bot/__pycache__/post.cpython-311.pyc differ diff --git a/Bot/bot/__pycache__/post.cpython-39.pyc b/Bot/bot/__pycache__/post.cpython-39.pyc new file mode 100644 index 0000000..e0b6189 Binary files /dev/null and b/Bot/bot/__pycache__/post.cpython-39.pyc differ diff --git a/Bot/bot/post.py b/Bot/bot/post.py new file mode 100644 index 0000000..1f47a24 --- /dev/null +++ b/Bot/bot/post.py @@ -0,0 +1,24 @@ +from pathlib import Path +from sqlitewrapper import Model, Datatype, Row + + +__all__ = ( + 'Datatype', + 'Posts', + 'Row', +) + + +class Posts(Model): + def __init__(self, db_name: str, save_path: Path) -> None: + self.__table = { + 'username': Datatype.STR, + 'title': Datatype.STR, + 'text': Datatype.STR, + 'post_id': Datatype.STR, + 'deletion_method': Datatype.STR, + 'post_last_edit': Datatype.STR, + 'record_created': Datatype.STR, + 'record_edited': Datatype.STR, + } + super().__init__(db_name, save_path, **self.__table) diff --git a/Bot/jsonwrapper/__init__.py b/Bot/jsonwrapper/__init__.py new file mode 100644 index 0000000..abf7a4c --- /dev/null +++ b/Bot/jsonwrapper/__init__.py @@ -0,0 +1 @@ +from .autosavedict import * # noqa diff --git a/Bot/jsonwrapper/__pycache__/__init__.cpython-311.pyc b/Bot/jsonwrapper/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..4953060 Binary files /dev/null and b/Bot/jsonwrapper/__pycache__/__init__.cpython-311.pyc differ diff --git a/Bot/jsonwrapper/__pycache__/__init__.cpython-39.pyc b/Bot/jsonwrapper/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..b899f2c Binary files /dev/null and b/Bot/jsonwrapper/__pycache__/__init__.cpython-39.pyc differ diff --git a/Bot/jsonwrapper/__pycache__/autosavedict.cpython-311.pyc b/Bot/jsonwrapper/__pycache__/autosavedict.cpython-311.pyc new file mode 100644 index 0000000..1d122d9 Binary files /dev/null and b/Bot/jsonwrapper/__pycache__/autosavedict.cpython-311.pyc differ diff --git a/Bot/jsonwrapper/__pycache__/autosavedict.cpython-39.pyc b/Bot/jsonwrapper/__pycache__/autosavedict.cpython-39.pyc new file mode 100644 index 0000000..38d99b0 Binary files /dev/null and b/Bot/jsonwrapper/__pycache__/autosavedict.cpython-39.pyc differ diff --git a/Bot/jsonwrapper/autosavedict.py b/Bot/jsonwrapper/autosavedict.py new file mode 100644 index 0000000..d3cd6e7 --- /dev/null +++ b/Bot/jsonwrapper/autosavedict.py @@ -0,0 +1,120 @@ +from __future__ import annotations +import os +import json +from typing import ( + Any, + Dict, + Tuple, + Mapping, + Optional, + Iterable, + MutableMapping, +) + + +__all__ = ( + 'AutoSaveDict', +) + + +class AutoSaveDict(dict[Any, Any]): + def __init__(self, + file_path: Optional[os.PathLike[Any]] = None, + **pairs: Any): + self.file_path = file_path + self.__default = pairs + + if self.file_path is None: + self._pairs = pairs + elif not os.path.exists(self.file_path): + self._pairs = pairs + else: + self._pairs = self._read() + super(AutoSaveDict, self).__init__(**self._pairs) + + def __setitem__(self, __key: Any, __value: Any) -> None: + data = self._read() + data[__key] = __value + self._write(data) + super().__setitem__(__key, __value) + + def __delitem__(self, __key: Any) -> None: + data = self._read() + del data[__key] + self._write(data) + return super().__delitem__(__key) + + def __or__(self, __value: Mapping[Any, Any]) -> AutoSaveDict: + data = self._pairs | __value + return AutoSaveDict(None, **data) + + def _write(self, content: Dict[Any, Any]) -> None: + with open(self.file_path, mode='w') as f: # type: ignore + json.dump(content, f, indent=4) + self._pairs = content + + def _read(self) -> Dict[Any, Any]: + with open(self.file_path, mode='r') as f: # type: ignore + data: Dict[Any, Any] = json.load(f) + return data + + @classmethod + def fromkeys(cls, __iterable: Iterable[Any], __value: Any = None, + file_path: Optional[os.PathLike[Any]] = None) -> AutoSaveDict: + data = {} + for key in __iterable: + data[key] = __value + return cls(file_path, **data) + + @classmethod + def frommapping(cls, __mapping: Mapping[Any, Any], + file_path: Optional[os.PathLike[Any]] = None)\ + -> AutoSaveDict: + data = dict(__mapping) + return cls(file_path, **data) + + @classmethod + def fromfile(cls, src: os.PathLike[Any], + dst: Optional[os.PathLike[Any]] = None) -> AutoSaveDict: + with open(src, mode='r') as f: + data = json.load(f) + return AutoSaveDict(dst, **data) + + def init(self) -> None: + if not os.path.exists(self.file_path): # type: ignore + self._write(self._pairs) + else: + self._pairs = self._read() + + def restore(self) -> None: + self.clear() + self.update(self.__default) + self.init() + + def copy(self, + file_path: Optional[os.PathLike[Any]] = None) -> AutoSaveDict: + data = {} + for key, val in self.items(): + data[key] = val + return AutoSaveDict(file_path, **data) + + def pop(self, __key: Any) -> Any: # type: ignore + data = self._read() + data.pop(__key) + self._write(data) + return super().pop(__key) + + def popitem(self) -> Tuple[Any, Any]: + key = tuple(self._read().keys())[-1] + value = self.pop(key) + super().popitem() + return (key, value) + + def clear(self) -> None: + self._write({}) + super().clear() + + def update(self, __m: Mapping[Any, Any]) -> MutableMapping: # type: ignore + for k, v in __m.items(): + self[k] = v + super().update(__m) diff --git a/Bot/jsonwrapper/tests.py b/Bot/jsonwrapper/tests.py new file mode 100644 index 0000000..cccacbf --- /dev/null +++ b/Bot/jsonwrapper/tests.py @@ -0,0 +1,146 @@ +import os +import json +import unittest +from pathlib import Path +from .autosavedict import AutoSaveDict + + +BASE_DIR = f'{os.sep}'.join(__file__.split(os.sep)[:-1]) + + +class TestAutoSaveDict(unittest.TestCase): + def setUp(self) -> None: + self.path = Path(BASE_DIR) / 'test.json' + self.default = {'a': 1, 'b': 2} + asd = AutoSaveDict(self.path, **self.default) + asd.init() + return super().setUp() + + def tearDown(self) -> None: + os.remove(self.path) + return super().tearDown() + + def test_init(self) -> None: + asd = AutoSaveDict(self.path, **self.default) + asd.init() + self.assertTrue(os.path.exists(self.path)) + self.assertEqual(asd, self.default) + self.assertEqual(asd._pairs, self.default) + self.assertEqual(asd._read(), self.default) + + def test_restore(self) -> None: + asd = AutoSaveDict(self.path, **self.default) + asd['c'] = 3 + self.assertIn('c', asd) + asd.restore() + self.assertEqual(asd, self.default) + self.assertEqual(asd._read(), self.default) + self.assertEqual(asd._pairs, self.default) + + def test_pop(self) -> None: + asd = AutoSaveDict(self.path, **self.default) + asd.init() + + key = tuple(self.default.keys())[0] + asd.pop(key) + + self.assertNotIn(key, asd._read()) + self.assertNotIn(key, asd._pairs) + self.assertNotIn(key, asd) + + def test_popitem(self) -> None: + asd = AutoSaveDict(self.path, **self.default) + asd.init() + key = tuple(asd.keys())[-1] + val = asd[key] + result = asd.popitem() + self.assertEqual(result, (key, val)) + self.assertNotIn(key, asd) + self.assertNotIn(key, asd._read()) + self.assertNotIn(key, asd._pairs) + + def test_update(self) -> None: + asd = AutoSaveDict(self.path, **self.default) + asd.init() + new = {'b': 3} + asd.update(new) + self.assertEqual(asd['b'], new['b']) + self.assertEqual(asd._read()['b'], new['b']) + self.assertEqual(asd._pairs['b'], new['b']) + + def test_copy(self) -> None: + copy_path = 'test_copy.json' + asd = AutoSaveDict(self.path, **self.default) + asd_copy = asd.copy(copy_path) # type: ignore + asd.init() + asd_copy.init() + + self.assertEqual(asd_copy, asd) + self.assertEqual(asd_copy, asd._read()) + self.assertEqual(asd_copy, asd._pairs) + os.remove(copy_path) + + def test_fromfile(self) -> None: + file = 'test_fromfile.json' + config = {'b': 3, 'c': 4} + + with open(file, mode='w') as f: + json.dump(config, f) + os.remove(self.path) + asd = AutoSaveDict.fromfile(file, self.path) # type: ignore + asd.init() + + self.assertEqual(config, asd) + self.assertEqual(config, asd._pairs) + self.assertEqual(config, asd._read()) + self.assertIsInstance(asd, AutoSaveDict) + os.remove(file) + + def test_frommapping(self) -> None: + path = 'test_frommapping.json' + mapping = ( + ('a', 1), + ('b', 2), + ('c', 3), + ) + expected = dict(mapping) + + asd = AutoSaveDict.frommapping(mapping, path) # type: ignore + asd.init() + + self.assertEqual(expected, asd) + self.assertEqual(expected, asd._read()) + self.assertEqual(expected, asd._pairs) + os.remove(path) + + def test_fromkeys(self) -> None: + path = 'test_fromkeys.json' + keys = 'test' + expected = dict.fromkeys(keys) + + asd = AutoSaveDict.fromkeys(keys, file_path=path) # type: ignore + asd.init() + self.assertEqual(asd, expected) + self.assertEqual(asd._read(), expected) + self.assertEqual(asd._pairs, expected) + os.remove(path) + + def test_setitem(self) -> None: + expected = {**self.default, "z": 3} + asd = AutoSaveDict(self.path, **self.default) + asd['z'] = 3 + self.assertEqual(asd, expected) + self.assertEqual(asd._read(), expected) + self.assertEqual(asd._pairs, expected) + + def test_delitem(self) -> None: + config = self.default.copy() + key = tuple(config.keys())[0] + asd = AutoSaveDict(self.path, **config) + asd.init() + + del config[key] + del asd[key] + self.assertEqual(asd, config) + self.assertEqual(asd._read(), config) + self.assertEqual(asd._pairs, config) diff --git a/Bot/logger/__init__.py b/Bot/logger/__init__.py new file mode 100644 index 0000000..91743ce --- /dev/null +++ b/Bot/logger/__init__.py @@ -0,0 +1 @@ +from .logger import * # noqa diff --git a/Bot/logger/__pycache__/__init__.cpython-311.pyc b/Bot/logger/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..b969034 Binary files /dev/null and b/Bot/logger/__pycache__/__init__.cpython-311.pyc differ diff --git a/Bot/logger/__pycache__/__init__.cpython-39.pyc b/Bot/logger/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..e08e7b8 Binary files /dev/null and b/Bot/logger/__pycache__/__init__.cpython-39.pyc differ diff --git a/Bot/logger/__pycache__/logger.cpython-311.pyc b/Bot/logger/__pycache__/logger.cpython-311.pyc new file mode 100644 index 0000000..f4a7dff Binary files /dev/null and b/Bot/logger/__pycache__/logger.cpython-311.pyc differ diff --git a/Bot/logger/__pycache__/logger.cpython-39.pyc b/Bot/logger/__pycache__/logger.cpython-39.pyc new file mode 100644 index 0000000..644010e Binary files /dev/null and b/Bot/logger/__pycache__/logger.cpython-39.pyc differ diff --git a/Bot/logger/logger.py b/Bot/logger/logger.py new file mode 100644 index 0000000..01da3f0 --- /dev/null +++ b/Bot/logger/logger.py @@ -0,0 +1,320 @@ +from __future__ import annotations +import os +import sys +from enum import Enum +from inspect import currentframe +from typing import ( + Optional, + Tuple, + Dict, + List, + Any, +) + +__name__ = 'testing' + +__all__ = [ + 'UnhandledLogError', + 'get_color', + 'Logger', +] + + +class Color(Enum): + # Color end string, color reset + RESET = "\033[0m" + # Regular Colors. Normal color, no bold, background color etc. + BLACK = "\033[0;30m" # BLACK + RED = "\033[0;31m" # RED + GREEN = "\033[0;32m" # GREEN + YELLOW = "\033[0;33m" # YELLOW + BLUE = "\033[0;34m" # BLUE + MAGENTA = "\033[0;35m" # MAGENTA + CYAN = "\033[0;36m" # CYAN + WHITE = "\033[0;37m" # WHITE + # Bold colors + BLACK_BOLD = "\033[1;30m" # BLACK + RED_BOLD = "\033[1;31m" # RED + GREEN_BOLD = "\033[1;32m" # GREEN + YELLOW_BOLD = "\033[1;33m" # YELLOW + BLUE_BOLD = "\033[1;34m" # BLUE + MAGENTA_BOLD = "\033[1;35m" # MAGENTA + CYAN_BOLD = "\033[1;36m" # CYAN + WHITE_BOLD = "\033[1;37m" # WHITE + + +def get_color(color: str) -> str: + """Colors: + ``` + ( + "RESET", + "BLACK", + "RED", + "GREEN", + "YELLOW", + "BLUE", + "MAGENTA", + "CYAN", + "WHITE", + "BLACK_BOLD", + "RED_BOLD", + "GREEN_BOLD", + "YELLOW_BOLD", + "BLUE_BOLD", + "MAGENTA_BOLD", + "CYAN_BOLD", + "WHITE_BOLD", + ) + ``` + """ + return {i.name: i.value for i in Color}[color.upper()] + + +class Config: + _INSTANCE = 0 + + def __init__(self, level: int) -> None: + Config._INSTANCE += 1 + self._INSTANCE = Config._INSTANCE + self._settings = { + 'success': 2, + 'info': 3, + 'custom': 2, + 'warning': 1, + 'error': 1, + 'debug': 2, + } + self.level = level + self._iter = 0 + + def __str__(self) -> str: + return f"<{self.__class__.__name__}({self._settings})>" + + def __repr__(self) -> str: + return f"<{self.__class__.__name__}(settings={self._settings},\ + level={self._level} instance={self._INSTANCE})>" + + def __bool__(self) -> bool: + return bool(self._settings) + + def __getitem__(self, k: str) -> int: + return self._settings[k] + + def __int__(self) -> int: + return self.level + + def __len__(self) -> int: + return len(self._settings) + + def __contains__(self, __o: Any) -> bool: + return __o in self._settings + + def __iter__(self) -> Config: + return self + + def __next__(self) -> str: + keys = self.keys() + if self._iter >= len(self): + self._iter = 0 + raise StopIteration + retval = keys[self._iter] + self._iter += 1 + return retval + + @property + def settings(self) -> Dict[str, int]: + return self._settings + + @property + def level(self) -> int: + return self._level + + @level.setter + def level(self, v: int) -> None: + """Level setter validator + + :param v: The level to change to + :type v: int + :raises ValueError: If the v is invalid for `level` + """ + if not 0 < v <= 5: + raise ValueError(f"Level must be between `0-5` not `{v}`") + self._level = v + + def items(self): # type: ignore + yield self._settings.items() + + def keys(self) -> List[str]: + return list(self._settings.keys()) + + def values(self) -> List[int]: + return list(self._settings.values()) + + def update(self, **settings: int) -> None: + """Change the settings configuration for a given instance + + :raises ValueError: If the configuraion does not exist or\ + user tries to update to an invalid value + """ + if all(key in self.settings for key in settings) and\ + all(0 < settings[key] <= 5 for key in settings): + self._settings.update(settings) + else: + raise ValueError(f"Invalid key or value in {settings}. Remember,\ + key has to exists in {self.settings} and all values have to be between (1-5)") + + def get(self, key: str) -> int: + """Get a setting configuration + + :param key: Key of the configuration + :type key: str + :return: The level this configuration is set to + :rtype: int + """ + return self._settings[key] + + +class UnhandledLogError(Exception): ... + + +class MetaLogger(type): + """A simple metaclass that checks if all logs/settings are correctly + handled by comaring the ammount of settings in Config._settings agnainst + the functions that live in Logger() - some unnecessary + """ + def __new__(self, name: str, bases: + Tuple[type], attrs: Dict[str, Any]) -> type: + error_msg = "We either have a log function that is not in settings OR\ + a function that needs to be dissmissed OR a setting that is\ + not added as a log function" + + log_functions = 0 + target_log_functions = len(Config(1)) + dismiss_attrs = ('settings', 'get_line_info') + for log in attrs: + if not log.startswith('_') and log not in dismiss_attrs: + log_functions += 1 + if not log_functions == target_log_functions: + raise UnhandledLogError(error_msg) + return type(name, bases, attrs) + + +class Logger(metaclass=MetaLogger): + """The Logger class handles debbuging with colored information + based on the level. Each instance has its own settings which the + user can change independently through `self.settings.update()`. + Mind that level 1 will print evetything and level 5 less + """ + def __init__(self, level: int = 2, log_path: Optional[str] = None): + """Initializer of Logger object + + :param level: The level of debugging. Based on that, some informations\ + can be configured to not show up thus lowering the verbosity, defaults to 2 + :type level: int, optional + """ + self._settings = Config(level) + self._log_path = log_path + + def __str__(self) -> str: + return f"<{self.__class__.__name__}Object-{self._settings._INSTANCE}>" + + def __repr__(self) -> str: + return f"<{self.__class__.__name__}(instance={self._settings._INSTANCE})>" + + @property + def settings(self) -> Config: + """Getter so self._settings cannot be writable + + :return: Config instance + :rtype: Config + """ + return self._settings + + def _log(self, header: str, msg: str) -> None: + """If a file log_path is provided in `Logger.__init__` + + :param header: The msg header WARNING/ERROR ... + :type header: str + :param msg: The message to be logged + :type msg: str + """ + if self._log_path is not None: + if not os.path.exists(self._log_path): + with open(self._log_path, mode='w') as _: ... + with open(self._log_path, mode='a') as f: + f.write(f"[{header.upper()}]: {msg}\n") + + def _runner(self, func_name: str) -> bool: + """Use to check the `self.level` before printing the log + + :param func_name: Name of the function as a string + :type func_name: str + :return: Wheather the settings allow this certain function to print + :rtype: bool + """ + return self.settings.level <= self.settings[func_name] + + def get_line_info(self, file: str, prompt: str) -> str: + """Get the file and the line of where this function is called + + :param file: The file where the func is called (Recommended: `__file__`) + :type file: str + :param prompt: Any message to follow after line info + :type prompt: str + :return: *file path*, *line number* *prompt* + :rtype: str + """ + cf = currentframe() + msg = f"File \"{file}\", line {cf.f_back.f_lineno}" # type: ignore + self.debug(f"{msg} : {prompt}") + print(file) + return msg + + # ONLY LOGGING FUNCTIONS AFTER THIS + def custom(self, msg: str, header: str = 'custom', + *args: Any, color: str = get_color('reset'), **kwargs: Any) -> None: + func_name = sys._getframe().f_code.co_name + if self._runner(func_name): + print(f"{color}[{header.upper()}]: {msg}{get_color('reset')}", *args, end='', **kwargs) + print(get_color('reset')) + self._log(header, msg) + + def info(self, msg: str, *args: Any, **kwargs: Any) -> None: + func_name = sys._getframe().f_code.co_name + if self._runner(func_name): + print(f"{Color.YELLOW.value}[{func_name.upper()}]: {msg}", + *args, end='', **kwargs) + print(get_color('reset')) + self._log(func_name, msg) + + def success(self, msg: str, *args: Any, **kwargs: Any) -> None: + func_name = sys._getframe().f_code.co_name + if self._runner(func_name): + print(f"{Color.GREEN.value}[{func_name.upper()}]: {msg}", + *args, end='', **kwargs) + print(get_color('reset')) + self._log(func_name, msg) + + def warning(self, msg: str, *args: Any, **kwargs: Any) -> None: + func_name = sys._getframe().f_code.co_name + if self._runner(func_name): + print(f"{Color.RED.value}[{func_name.upper()}]: {msg}", + *args, end='', **kwargs) + print(get_color('reset')) + self._log(func_name, msg) + + def error(self, msg: str, *args: Any, **kwargs: Any) -> None: + func_name = sys._getframe().f_code.co_name + if self._runner(func_name): + print(f"{Color.RED_BOLD.value}[{func_name.upper()}]: {msg}", + *args, end='', **kwargs) + print(get_color('reset')) + self._log(func_name, msg) + + def debug(self, msg: str, *args: Any, **kwargs: Any) -> None: + func_name = sys._getframe().f_code.co_name + if self._runner(func_name): + print(f"{Color.BLUE.value}[{func_name.upper()}]: {msg}", + *args, end='', **kwargs) + print(get_color('reset')) + self._log(func_name, msg) diff --git a/Bot/logger/tests.py b/Bot/logger/tests.py new file mode 100644 index 0000000..47f224c --- /dev/null +++ b/Bot/logger/tests.py @@ -0,0 +1,123 @@ +import os +import io +import sys +import unittest +import unittest.mock +from pathlib import Path +from logger import Logger +from .logger import Config +from typing import Any + + +BASE_DIR = Path(__file__).parent + + +class TestLogger(unittest.TestCase): + def setUp(self) -> None: + self.logger = Logger(1) + + def tearDown(self) -> None: + self.logger = Logger(1) + + def test_invalid_instance(self) -> None: + with self.assertRaises(ValueError, msg='An instance with level=0 is created'): + Logger(0) + + with self.assertRaises(ValueError, msg='An instance with level=6 is created'): + Logger(6) + + logger = Logger(2) + with self.assertRaises(ValueError, msg='A settings instance changed level above limit after creation'): # noqa + logger.settings.level = 6 + + logger = Logger(2) + with self.assertRaises(ValueError, msg='A settings instance changed level above limit after creation'): # noqa + logger.settings.level = 0 + + def test_instance_counter(self) -> None: + Config._INSTANCE = 0 + l1 = Logger() + self.assertEqual(l1.settings._INSTANCE, 1, msg="First instance counter is wrong") + l2 = Logger() + self.assertEqual(l2.settings._INSTANCE, 2, msg="Second instance counter is wrong") + + self.assertEqual(l1.settings._INSTANCE, 1, msg="Repeated instance counter is wrong") + + def test_update_settings(self) -> None: + self.logger.settings.update(success=4) + self.assertEqual(self.logger.settings['success'], 4) + + with self.assertRaises(TypeError, msg="self.logger object is directly assignable"): + self.logger.settings['success'] = 4 # type: ignore + + with self.assertRaises(ValueError, msg="not existing key passed into self.settings"): + self.logger.settings.update(not_exists=4) + + with self.assertRaises(TypeError, msg="String passed as value in self.settings"): + self.logger.settings.update(success='4') # type: ignore + + with self.assertRaises(ValueError, msg='0 Passed as valid key in self.settings'): + self.logger.settings.update(success=0) + + with self.assertRaises(ValueError, msg="Above the allowed limit passed as valid key in self.settings"): # noqa + self.logger.settings.update(success=6) + + with self.assertRaises(AttributeError, msg="self.settings is overwritable (`=`)"): + self.logger.settings.settings = 'fsaf' # type: ignore + + self.logger.settings.update(success=2, info=1) + self.assertEqual(self.logger.settings['success'], 2, msg='Error in updating 2 values at once at `success`') # noqa + self.assertEqual(self.logger.settings['info'], 1, msg='Error in updating 2 values at once at `info`') # noqa + + def test_get_settings(self) -> None: + with self.assertRaises(AttributeError, msg="self.settings is overwritable (`=`) instead of read-only"): # noqa + self.logger.settings = 'fsaf' # type: ignore + + def test_get_setting(self) -> None: + self.assertEqual(self.logger.settings.get('info'), 3, msg="settings.get() returns value from wrong key or the value has changed") # noqa + with self.assertRaises(KeyError): + self.logger.settings.get('not_existing') + + @unittest.mock.patch('sys.stdout', new_callable=io.StringIO) + def test_success_msg(self, mock_stdout: Any) -> None: + msg = "anythong" + + self.logger.settings.level = 5 + self.logger.success(msg) + self.assertFalse(mock_stdout.getvalue()) + + self.logger.info(msg) + self.assertTrue(mock_stdout.getvalue()) + + self.logger.settings.level = 1 + self.logger.success(msg) + self.assertTrue(mock_stdout.getvalue()) + + self.logger.info(msg) + self.assertTrue(mock_stdout.getvalue()) + + def test_iter_next(self) -> None: + for i2 in self.logger.settings: ... + + for i1 in self.logger.settings: ... + + self.assertEqual(i1, i2, msg="There is something wrong with Config.__iter__ and Config.__next__. Maybe the index (self._iter) is not refreshing correctly") # noqa + + def test_log_to_file(self) -> None: + msg = 'This should be written in the file' + file = Path(f"{BASE_DIR}tests.txt") + logger = Logger(1, str(file)) + + # Redirect the annoying log to '/dev/null' + # (or according file for other platforms) and bring it back + std_out = sys.stdout + f = open(os.devnull, mode='w') + sys.stdout = f + logger.info(msg) + f.close() + sys.stdout = std_out + + self.assertTrue(os.path.exists(file)) + with open(file, mode='r') as f: + self.assertEqual(f"[INFO]: {msg}", f.read()[:-1]) # Slice to remove '\n' from the file + os.remove(file) diff --git a/Bot/main.py b/Bot/main.py new file mode 100644 index 0000000..28402e8 --- /dev/null +++ b/Bot/main.py @@ -0,0 +1,250 @@ +# mypy: disable-error-code=attr-defined +import os +import sys +import praw # type: ignore +import time +import utils +import prawcore # type: ignore +import traceback +import datetime as dt +from pathlib import Path +from logger import Logger +from jsonwrapper import AutoSaveDict +from typing import ( + Optional, + Callable, + Tuple, + List, + Set, + Any, +) +from bot import ( + Datatype, + Posts, + Row, +) + + +def config_app(path: Path) -> AutoSaveDict: + config = { + 'client_id': '', + 'client_secret': '', + 'user_agent': '', + 'username': '', + 'password': '', + 'sub_name': '', + 'max_days': '', + 'max_posts': '', + 'sleep_minutes': '', + } + + configuration: List[List[str]] = [] + + if not os.path.exists(path): + for key, _ in config.items(): + config_name = ' '.join(key.split('_')).title() + user_inp = input(f"{config_name}: ") + configuration.append([key, user_inp]) + + for config_name, value in configuration: + config[config_name] = value + + config_handler = AutoSaveDict( + path, + **config + ) + return config_handler + + +config_dir = Path(utils.BASE_DIR, 'config') +config_dir.mkdir(parents=True, exist_ok=True) +config_file = Path(config_dir, 'config.json') +handler = config_app(config_file) +handler.init() +posts = Posts('deleted_posts', config_dir) +logger = Logger(1) +untracked_flairs = (utils.Flair.SOLVED, utils.Flair.ABANDONED) +posts.init() +reddit = praw.Reddit( + client_id=handler['client_id'], + client_secret=handler['client_secret'], + user_agent=handler['user_agent'], + username=handler['username'], + password=handler['password'], +) + + +def remove_method(submission: praw.reddit.Submission) -> Optional[str]: + removed = submission.removed_by_category + if removed is not None: + # if removed in ('author', 'moderator'): + # method = 'Removed by moderator' + if removed in ('author',): + method = 'Deleted by OP' + elif removed in ('moderator',): + method = 'Removed by mod' + elif removed in ('deleted',): + method = 'Deleted by user' + else: + method = 'Uknown deletion method' + return method + + return None + + +def send_modmail(reddit: praw.Reddit, subreddit: str, subject: str, msg: str) -> None: + # build the payload for the compose API + # Note: The caller provides subject/msg; subreddit is used for the `to` field. + data = { + "subject": subject, + "text": msg, + "to": f"/r/{subreddit}", + } + try: + print("Sending modmail via api/compose/") + reddit.post("api/compose/", data=data) + except Exception: + # fallback/report if necessary + print("Failed to send modmail with new method") + raise + + +def notify_if_error(func: Callable[..., int]) -> Callable[..., int]: + def wrapper(*args: Any, **kwargs: Any) -> int: + try: + return func(*args, **kwargs) + except KeyboardInterrupt: + logger.debug("\nProgram interrupted by user") + return 0 + except: + author = 'https://www.reddit.com/user/kaerfkeerg' + full_error = traceback.format_exc() + bot_name = utils.BOT_NAME + msg = f"Error with '{bot_name}':\n\n{full_error}\n\nPlease report to author ({author})" + send_modmail( + reddit, + handler['sub_name'], + f'An error has occured with {utils.BOT_NAME} msg', + msg + ) + return 1 + return wrapper + + +def should_be_tracked( + flair: utils.Flair, + untracked_flairs: Tuple[utils.Flair, ...]) -> bool: + return flair not in untracked_flairs + + +def user_is_deleted(submission: praw.reddit.Submission) -> bool: + return submission.author is None + + +def check_submission(submission: praw.reddit.Submission, saved_submission_ids: Set[Row]) -> None: + if not user_is_deleted(submission) and submission.id not in saved_submission_ids: + flair = utils.get_flair(submission.link_flair_text) + method = remove_method(submission) + if should_be_tracked(flair, untracked_flairs): + if method is None and submission.author is not None: + original_post = Row( + username=submission.author.name, + title=submission.title, + text=submission.selftext, + post_id=submission.id, + deletion_method=Datatype.NULL, + post_last_edit=Datatype.NULL, + record_created=str(dt.datetime.now()), + record_edited=str(dt.datetime.now()), + ) + posts.save(original_post) + + +@notify_if_error +def main() -> int: + # run indefinitely, sleeping between iterations + while True: + posts_to_delete: Set[Row] = set() + ignore_methods = ['Removed by mod',] + + if utils.parse_cmd_line_args(sys.argv, logger, config_file, posts): + return 0 + + saved_submission_ids = {row.post_id for row in posts.fetch_all()} + max_posts = handler['max_posts'] + limit = int(max_posts) if max_posts else None + sub_name = handler['sub_name'] + + for submission in reddit.subreddit(sub_name).new(limit=limit): + try: + check_submission(submission, saved_submission_ids) + except prawcore.exceptions.TooManyRequests: + time.sleep(60) + check_submission(submission, saved_submission_ids) + + for stored_post in posts.fetch_all(): + try: + submission = reddit.submission(id=stored_post.post_id) + max_days = int(handler['max_days']) + created = utils.string_to_dt(stored_post.record_created).date() + flair = utils.get_flair(submission.link_flair_text) + + if utils.submission_is_older(created, max_days) or flair in untracked_flairs: + posts_to_delete.add(stored_post) + continue + + submission = reddit.submission(id=stored_post.post_id) + method = remove_method(submission) + if user_is_deleted(submission): + if method not in ignore_methods: + send_modmail( + reddit, + handler['sub_name'], + "User's account has been deleted", + utils.modmail_removal_notification(stored_post, 'Account has been deleted') + ) + posts_to_delete.add(stored_post) + + elif method is not None and not stored_post.deletion_method: + if method not in ignore_methods: + stored_post.deletion_method = method + stored_post.record_edited = str(dt.datetime.now()) + posts.edit(stored_post) + msg = utils.modmail_removal_notification(stored_post, method) + send_modmail( + reddit, + handler['sub_name'], + 'A post has been deleted', + msg + ) + posts_to_delete.add(stored_post) + time.sleep(utils.MSG_AWAIT_THRESHOLD) + + if submission.selftext != stored_post.text\ + or submission.selftext != stored_post.post_last_edit\ + and not stored_post.deletion_method: + stored_post.post_last_edit = submission.selftext + stored_post.record_edited = str(dt.datetime.now()) + posts.edit(stored_post) + except prawcore.exceptions.TooManyRequests: + time.sleep(60) + + for row in posts_to_delete: + posts.delete(post_id=row.post_id) + + posts_to_delete.clear() + logger.info("Program finished successfully") + logger.info(f"Total posts deleted: {len(posts_to_delete)}") + + # wait before the next cycle + sleep_minutes = int(handler['sleep_minutes']) if handler['sleep_minutes'] else 5 + time.sleep(sleep_minutes * 60) + + # end of while True + return 0 + + +if __name__ == '__main__': + sys.exit( + main() + ) \ No newline at end of file diff --git a/Bot/main.py.bkup b/Bot/main.py.bkup new file mode 100644 index 0000000..094c60e --- /dev/null +++ b/Bot/main.py.bkup @@ -0,0 +1,228 @@ +# mypy: disable-error-code=attr-defined +import os +import sys +import praw # type: ignore +import time +import utils +import prawcore # type: ignore +import traceback +import datetime as dt +from pathlib import Path +from logger import Logger +from jsonwrapper import AutoSaveDict +from typing import ( + Optional, + Callable, + Tuple, + List, + Set, + Any, +) +from bot import ( + Datatype, + Posts, + Row, +) + + +def config_app(path: Path) -> AutoSaveDict: + config = { + 'client_id': '', + 'client_secret': '', + 'user_agent': '', + 'username': '', + 'password': '', + 'sub_name': '', + 'max_days': '', + 'max_posts': '', + } + + configuration: List[List[str]] = [] + + if not os.path.exists(path): + for key, _ in config.items(): + config_name = ' '.join(key.split('_')).title() + user_inp = input(f"{config_name}: ") + configuration.append([key, user_inp]) + + for config_name, value in configuration: + config[config_name] = value + + config_handler = AutoSaveDict( + path, + **config + ) + return config_handler + + +config_file = Path(utils.BASE_DIR, 'config.json') +handler = config_app(config_file) +handler.init() +posts = Posts('post', utils.BASE_DIR) +logger = Logger(1) +untracked_flairs = (utils.Flair.SOLVED, utils.Flair.ABANDONED) +posts.init() +reddit = praw.Reddit( + client_id=handler['client_id'], + client_secret=handler['client_secret'], + user_agent=handler['user_agent'], + username=handler['username'], + password=handler['password'], +) + + +def remove_method(submission: praw.reddit.Submission) -> Optional[str]: + removed = submission.removed_by_category + if removed is not None: + # if removed in ('author', 'moderator'): + # method = 'Removed by moderator' + if removed in ('author',): + method = 'Deleted by OP' + elif removed in ('moderator',): + method = 'Removed by mod' + elif removed in ('deleted',): + method = 'Deleted by user' + else: + method = 'Uknown deletion method' + return method + + return None + + +def send_modmail(reddit: praw.Reddit, subreddit: str, subject: str, msg: str) -> None: + print("Sending modmail...") + reddit.subreddit(subreddit).message(subject, msg) + print(msg) + + +def notify_if_error(func: Callable[..., int]) -> Callable[..., int]: + def wrapper(*args: Any, **kwargs: Any) -> int: + try: + return func(*args, **kwargs) + except KeyboardInterrupt: + logger.debug("\nProgram interrupted by user") + return 0 + except: + author = 'https://www.reddit.com/user/kaerfkeerg' + full_error = traceback.format_exc() + bot_name = utils.BOT_NAME + msg = f"Error with '{bot_name}':\n\n{full_error}\n\nPlease report to author ({author})" + send_modmail( + reddit, + handler['sub_name'], + f'An error has occured with {utils.BOT_NAME} msg', + msg + ) + return 1 + return wrapper + + +def should_be_tracked( + flair: utils.Flair, + untracked_flairs: Tuple[utils.Flair, ...]) -> bool: + return flair not in untracked_flairs + + +def user_is_deleted(submission: praw.reddit.Submission) -> bool: + return submission.author is None + + +def check_submission(submission: praw.reddit.Submission, saved_submission_ids: Set[Row]) -> None: + if not user_is_deleted(submission) and submission.id not in saved_submission_ids: + flair = utils.get_flair(submission.link_flair_text) + method = remove_method(submission) + if should_be_tracked(flair, untracked_flairs): + if method is None and submission.author is not None: + original_post = Row( + username=submission.author.name, + title=submission.title, + text=submission.selftext, + post_id=submission.id, + deletion_method=Datatype.NULL, + post_last_edit=Datatype.NULL, + record_created=str(dt.datetime.now()), + record_edited=str(dt.datetime.now()), + ) + posts.save(original_post) + + +@notify_if_error +def main() -> int: + posts_to_delete: Set[Row] = set() + ignore_methods = ['Removed by mod',] + + if utils.parse_cmd_line_args(sys.argv, logger, config_file, posts): + return 0 + + saved_submission_ids = {row.post_id for row in posts.fetch_all()} + max_posts = handler['max_posts'] + limit = int(max_posts) if max_posts else None + sub_name = handler['sub_name'] + + for submission in reddit.subreddit(sub_name).new(limit=limit): + try: + check_submission(submission, saved_submission_ids) + except prawcore.exceptions.TooManyRequests: + time.sleep(60) + check_submission(submission, saved_submission_ids) + + for stored_post in posts.fetch_all(): + try: + submission = reddit.submission(id=stored_post.post_id) + max_days = int(handler['max_days']) + created = utils.string_to_dt(stored_post.record_created).date() + flair = utils.get_flair(submission.link_flair_text) + + if utils.submission_is_older(created, max_days) or flair in untracked_flairs: + posts_to_delete.add(stored_post) + continue + + submission = reddit.submission(id=stored_post.post_id) + method = remove_method(submission) + if user_is_deleted(submission): + if method not in ignore_methods: + send_modmail( + reddit, + handler['sub_name'], + "User's account has been deleted", + utils.modmail_removal_notification(stored_post, 'Account has been deleted') + ) + posts_to_delete.add(stored_post) + + elif method is not None and not stored_post.deletion_method: + if method not in ignore_methods: + stored_post.deletion_method = method + stored_post.record_edited = str(dt.datetime.now()) + posts.edit(stored_post) + msg = utils.modmail_removal_notification(stored_post, method) + send_modmail( + reddit, + handler['sub_name'], + 'A post has been deleted', + msg + ) + posts_to_delete.add(stored_post) + time.sleep(utils.MSG_AWAIT_THRESHOLD) + + if submission.selftext != stored_post.text\ + or submission.selftext != stored_post.post_last_edit\ + and not stored_post.deletion_method: + stored_post.post_last_edit = submission.selftext + stored_post.record_edited = str(dt.datetime.now()) + posts.edit(stored_post) + except prawcore.exceptions.TooManyRequests: + time.sleep(60) + + for row in posts_to_delete: + posts.delete(post_id=row.post_id) + + posts_to_delete.clear() + logger.info("Program finished successfully") + logger.info(f"Total posts deleted: {len(posts_to_delete)}") + return 0 + + +if __name__ == '__main__': + sys.exit( + main() + ) diff --git a/Bot/sqlitewrapper/__init__.py b/Bot/sqlitewrapper/__init__.py new file mode 100644 index 0000000..94cd79c --- /dev/null +++ b/Bot/sqlitewrapper/__init__.py @@ -0,0 +1 @@ +from .model import * # noqa diff --git a/Bot/sqlitewrapper/__pycache__/__init__.cpython-311.pyc b/Bot/sqlitewrapper/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..9acdc38 Binary files /dev/null and b/Bot/sqlitewrapper/__pycache__/__init__.cpython-311.pyc differ diff --git a/Bot/sqlitewrapper/__pycache__/__init__.cpython-39.pyc b/Bot/sqlitewrapper/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..1523d81 Binary files /dev/null and b/Bot/sqlitewrapper/__pycache__/__init__.cpython-39.pyc differ diff --git a/Bot/sqlitewrapper/__pycache__/model.cpython-311.pyc b/Bot/sqlitewrapper/__pycache__/model.cpython-311.pyc new file mode 100644 index 0000000..f88815e Binary files /dev/null and b/Bot/sqlitewrapper/__pycache__/model.cpython-311.pyc differ diff --git a/Bot/sqlitewrapper/__pycache__/model.cpython-39.pyc b/Bot/sqlitewrapper/__pycache__/model.cpython-39.pyc new file mode 100644 index 0000000..941df62 Binary files /dev/null and b/Bot/sqlitewrapper/__pycache__/model.cpython-39.pyc differ diff --git a/Bot/sqlitewrapper/model.py b/Bot/sqlitewrapper/model.py new file mode 100644 index 0000000..dad6c63 --- /dev/null +++ b/Bot/sqlitewrapper/model.py @@ -0,0 +1,309 @@ +# mypy: disable-error-code=attr-defined +from __future__ import annotations +from pathlib import Path +from sqlite3 import ( + Connection, + connect, +) +from typing import ( + Generator, + Optional, + Tuple, + List, + Dict, + Any, +) + + +__all__ = ( + 'Row', + 'Model', + 'Datatype', +) + + +class Row: + def __init__(self, **attrs: Any) -> None: + for name, value in attrs.items(): + self.__dict__[name] = value + + def values(self) -> Tuple[Any, ...]: + return tuple(self.__dict__.values()) + + def keys(self) -> Tuple[Any, ...]: + return tuple(self.__dict__.keys()) + + def dict(self) -> Dict[Any, Any]: + return self.__dict__ + + def items(self) -> Any: + return self.__dict__.items() + + def __str__(self) -> str: + return f"" + + def __repr__(self) -> str: + return str(self) + + def __iter__(self) -> Row: + self.__n = 0 + return self + + def __next__(self) -> Any: + keys = tuple(self.__dict__.keys())[:-1] # Remove the `self.__n` + if self.__n == len(keys): + raise StopIteration + data = keys[self.__n] + self.__n += 1 + return data + + def __getitem__(self, __k: Any) -> Any: + return self.__dict__[__k] + + +class Datatype: + ID = 'INTEGER PRIMARY KEY' + NULL = None + INT = 'INTEGER' + REAL = 'REAL' + STR = 'TEXT' + BLOB = 'BLOB' + + +class ConnectionManager: + def __init__(self, db: str) -> None: + self.db = db + self._connection = connect(self.db) + + def __enter__(self) -> Connection: + return self._connection + + def __exit__(self, *args: Any) -> None: + self._connection.commit() + self._connection.close() + + +class Model: + def __init__(self, db_name: str, save_path: Path, **table: Any) -> None: + self.name = db_name + self.path = str(Path(f"{save_path}/.{db_name}.sqlite")) + self.table = table + self.table['id'] = Datatype.ID + + self.table_values = ' '.join( + f"{name} {datatype}," for (name, datatype) in table.items() + )[:-1] + + def __str__(self) -> str: + data = list(self.fetch_all()) + return f"{self.__class__.__name__}{data}" + + def __repr__(self) -> str: + return str(self) + + def __hash__(self) -> int: + return hash(self.path) + + def _get_conditions(self, **where: Any) -> str: + keys = tuple(where.keys()) + + condition = "" + for index, key in enumerate(keys): + condition += f"{key} = ?" + if index != len(keys) - 1: + condition += " AND " + + return condition + + def execute(self, query: str, values: Optional[Tuple[Row, ...]] = None) -> Any: + """Execute a query + + :param query: An SQL Query + :type query: str + :param values: vales to be added, if any, defaults to None + :type values: Optional[Tuple[Row, ...]], optional + :raises Exception: If tha database has not been initialized + before trying to execute any queries + :return: Whatever the query would return + :rtype: Any + """ + with ConnectionManager(self.path) as cur: + if values is None: + data = cur.execute(query) + else: + data = cur.execute(query, values) + return data.fetchall() + + def init(self) -> None: + """Create a table based on the `self.table` (**table) kwargs + provided upon initialization + """ + query = f""" + CREATE TABLE IF NOT EXISTS {self.name} ( + {self.table_values} + ) + """ + self.execute(query) + + def save(self, row: Row) -> None: + """Save a row into the db. Example: + ``` + >>> row = Row(name='Pantelis', age=13) + >>> self.save(row) + ``` + + :param row: A row object + :type row: Row + :raises ValueError: If the Row values does not match the db schema + """ + fields = self.table + # - 1 for the id field + if len(fields) - 1 != len(row.keys()): + raise ValueError(f"Row fields {row.keys()} do not much db schema\ + {tuple(self.table.keys())[:-1]}. Consider adding 'Datatype.NULL' for the missing fields") + + marks = [] + for _ in row.values(): + marks.append('?') + + query = f""" + INSERT INTO {self.name} {row.keys()} + VALUES ( + {", ".join(marks)} + ) + """ + self.execute(query, row.values()) + + def delete(self, **where: Any) -> None: + """Delete a row from the db. Example: + ``` + >>> # Query: `DELETE FROM {self.name} WHERE name = ? AND age = ?` + >>> # This will delete every row with name='John' and age=15 + >>> self.delete(name='John') + ``` + """ + values = tuple(where.values()) + condition = self._get_conditions(**where) + + query = f""" + DELETE FROM {self.name} + WHERE + {condition} + """ + self.execute(query, values) + + def edit(self, row: Row) -> None: + """After you picked and changed a row, use this instead of `save` in order + for the entry to preserver the same `id`. Example: + ``` + >>> row = self.get(name='john') + >>> row.name = 'Mary' + >>> self.edit(row) + ``` + + :param row: _description_ + :type row: Row + """ + id = row.id + self.delete(id=id) + + marks = [] + for _ in row.values(): + marks.append('?') + + query = f""" + INSERT INTO {self.name} {row.keys()} + VALUES ( + {", ".join(marks)} + ) + """ + + self.execute(query, row.values()) + + def _entries_as_rows(self, data: List[Any]) -> List[Row]: + """Take a list of entries and convert it to a list of `Row`s + + :param data: The list of entries + :type data: List[Any] + :return: A copy of the data as list or `Row`s + :rtype: List[Row] + """ + # rows = [ + # , + # , + # ] + table_keys = tuple(self.table.keys()) + rows = [] + + for row in data: + struct = {} + for index, col in enumerate(row): + struct[table_keys[index]] = col + rows.append(Row(**struct)) + struct.clear() + + return rows + + def fetch_all(self) -> Generator[Row, None, None]: + query = f"SELECT * FROM {self.name}" + data = self.execute(query) + + rows = self._entries_as_rows(data) + yield from rows + + def filter(self, **where: Any) -> Generator[Row, None, None]: + """Filter out data from the db based on the `where` conditions. Example + ``` + >>> data = self.filter(name='Pantelis', age=13) + >>> # Query created + >>> # SELECT * FROM test WHERE name = Pantelis AND age = 13 + >>> for i in data: + ... i + + ``` + + :yield: Row + :rtype: Generator[Row, None, None] + """ + # cursor.execute("SELECT * FROM my_table WHERE name = ? AND age = ?", (name, age)) + values = tuple(where.values()) + condition = self._get_conditions(**where) + + query = f""" + SELECT * FROM {self.name} + WHERE + {condition} + """ + + data = self.execute(query, values) + rows = self._entries_as_rows(data) + yield from rows + + def get(self, **where: Any) -> Row: + """Find the first occurance matching the `where` condition(s) Example: + ``` + >>> self.get(name="Pantelis", age=12) + + ``` + + :return: A `Row` with the values of the matching row + :rtype: Row + """ + values = tuple(where.values()) + + condition = self._get_conditions(**where) + + query = f""" + SELECT * FROM {self.name} + WHERE + {condition} + """ + data = self.execute(query, values)[0] + row = {} + for value, name in zip(data, tuple(self.table.keys())): + row[name] = value + return Row(**row) + + +if __name__ == '__main__': + pass diff --git a/Bot/sqlitewrapper/tests.py b/Bot/sqlitewrapper/tests.py new file mode 100644 index 0000000..175c730 --- /dev/null +++ b/Bot/sqlitewrapper/tests.py @@ -0,0 +1,83 @@ +# mypy: disable-error-code=attr-defined +import unittest +import os +from pathlib import Path +from .model import ( + Row, + Model, + Datatype +) + + +class TestRow(unittest.TestCase): + def setUp(self) -> None: + return super().setUp() + + def tearDown(self) -> None: + return super().tearDown() + + def test_init(self) -> None: + name, age = 'Mary', 14 + row = Row(name=name, age=age) + result = row.values() + self.assertEqual(result, (name, age)) + + +class TestModel(unittest.TestCase): + def setUp(self) -> None: + self.base_dir = Path(__file__).parent + self.name = 'testdb' + self.db = Model( + self.name, + self.base_dir, + name=Datatype.STR, + age=Datatype.INT, + ) + self.db.init() + return super().setUp() + + def tearDown(self) -> None: + os.remove(self.db.path) + return super().tearDown() + + def test_init(self) -> None: + self.assertTrue(os.path.exists(self.db.path)) + + def test_save(self) -> None: + name, age = 'John', 14 + self.db.save(Row(name=name, age=age)) + self.db.get(name=name, age=age) # This must not raise an Exception + + with self.assertRaises(ValueError): + self.db.save(Row(name='test')) + + def test_delete(self) -> None: + name, age = 'John', 14 + self.db.save(Row(name=name, age=age)) + self.db.delete(name=name, age=age) + self.assertEqual(len(tuple(self.db.fetch_all())), 0) + + def test_edit(self) -> None: + name, age = 'John', 14 + self.db.save(Row(name=name, age=age)) + r = self.db.get(name=name, age=age) + r.name = 'Mary' + self.db.edit(r) + self.assertEqual(len(tuple(self.db.fetch_all())), 1) + self.db.get(name='Mary', age=age) # This should not raise an exception + + def test_filter(self) -> None: + data = ( + ('John', 14), + ('Mary', 14), + ('Mary', 15), + ) + + for i in data: + self.db.save(Row(name=i[0], age=i[1])) + + age = 14 + filtered = self.db.filter(age=age) + data = list(filtered) # type: ignore + self.assertTrue(all(i.age == age for i in data)) # type: ignore + self.assertEqual(len(data), 2) diff --git a/Bot/utils/__init__.py b/Bot/utils/__init__.py new file mode 100644 index 0000000..49b8741 --- /dev/null +++ b/Bot/utils/__init__.py @@ -0,0 +1,2 @@ +from .constants import * # noqa +from .actions import * # noqa diff --git a/Bot/utils/__pycache__/__init__.cpython-311.pyc b/Bot/utils/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..f734f85 Binary files /dev/null and b/Bot/utils/__pycache__/__init__.cpython-311.pyc differ diff --git a/Bot/utils/__pycache__/__init__.cpython-39.pyc b/Bot/utils/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..62e4418 Binary files /dev/null and b/Bot/utils/__pycache__/__init__.cpython-39.pyc differ diff --git a/Bot/utils/__pycache__/actions.cpython-311.pyc b/Bot/utils/__pycache__/actions.cpython-311.pyc new file mode 100644 index 0000000..c1235b8 Binary files /dev/null and b/Bot/utils/__pycache__/actions.cpython-311.pyc differ diff --git a/Bot/utils/__pycache__/actions.cpython-39.pyc b/Bot/utils/__pycache__/actions.cpython-39.pyc new file mode 100644 index 0000000..f8b3798 Binary files /dev/null and b/Bot/utils/__pycache__/actions.cpython-39.pyc differ diff --git a/Bot/utils/__pycache__/constants.cpython-311.pyc b/Bot/utils/__pycache__/constants.cpython-311.pyc new file mode 100644 index 0000000..c653a28 Binary files /dev/null and b/Bot/utils/__pycache__/constants.cpython-311.pyc differ diff --git a/Bot/utils/__pycache__/constants.cpython-39.pyc b/Bot/utils/__pycache__/constants.cpython-39.pyc new file mode 100644 index 0000000..8d3f1ac Binary files /dev/null and b/Bot/utils/__pycache__/constants.cpython-39.pyc differ diff --git a/Bot/utils/actions.py b/Bot/utils/actions.py new file mode 100644 index 0000000..ab1ce5e --- /dev/null +++ b/Bot/utils/actions.py @@ -0,0 +1,99 @@ +# mypy: disable-error-code=attr-defined +import os +import datetime as dt +from bot import Posts +from pathlib import Path +from enum import Enum +from typing import List +from logger import Logger +from sqlitewrapper import Row + + +__all__ = ( + 'Flair', + 'get_flair', + 'modmail_removal_notification', + 'parse_cmd_line_args', + 'submission_is_older', + 'string_to_dt', +) + + +class Flair(Enum): + SOLVED = 'Solved' + ABANDONED = 'Abandoned' + UKNOWN = 'Uknown' + + +def get_flair(flair: str) -> Flair: + try: + return Flair(flair) + except ValueError: + return Flair('Uknown') + + +def modmail_removal_notification(submission: Row, method: str) -> str: + return f"""A post has been removed + +OP: `{submission.username}` + +Title: {submission.title} + +Post ID: https://old.reddit.com/comments/{submission.post_id} + +Date created: {submission.record_created} + +Date found: {submission.record_edited} + +Ban Template; + + [Deleted post](https://reddit.com/comments/{submission.post_id}). + + Deleting an answered post, without marking it solved, is against our rules. + + You can read [our rules](https://reddit.com/r/MinecraftHelp/wiki/rules) to see if you're eligible to appeal this ban.""" + + +def parse_cmd_line_args(args: List[str], logger: Logger, config_file: Path, posts: Posts) -> bool: + help_msg = """Command line help prompt + Command: help + Args: [] + Decription: Prints the help prompt + + Command: reset_config + Args: [] + Decription: Reset the bot credentials + + Command: reset_db + Args: [] + Decription: Reset the database +""" + if len(args) > 1: + if args[1] == 'help': + logger.info(help_msg) + elif args[1] == 'reset_config': + try: + os.remove(config_file) + except FileNotFoundError: + logger.error("No configuration file found") + elif args[1] == 'reset_db': + try: + os.remove(posts.path) + except FileNotFoundError: + logger.error("No database found") + else: + logger.info(help_msg) + return True + return False + + +def submission_is_older(submission_date: dt.date, max_days: int) -> bool: + current_date = dt.datetime.now().date() + time_difference = current_date - submission_date + if time_difference.days > max_days: + return True + return False + + +def string_to_dt(date_string: str) -> dt.datetime: + return dt.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f') diff --git a/Bot/utils/constants.py b/Bot/utils/constants.py new file mode 100644 index 0000000..e8ac1e6 --- /dev/null +++ b/Bot/utils/constants.py @@ -0,0 +1,13 @@ +from pathlib import Path + + +__all__ = ( + 'BASE_DIR', + 'BOT_NAME', + 'MSG_AWAIT_THRESHOLD', +) + + +BOT_NAME = 'CraftSleuthBot' +BASE_DIR = Path(__file__).parent.parent.parent +MSG_AWAIT_THRESHOLD = 5 diff --git a/Bot/utils/tests.py b/Bot/utils/tests.py new file mode 100644 index 0000000..14213c0 --- /dev/null +++ b/Bot/utils/tests.py @@ -0,0 +1,48 @@ +import unittest +import datetime as dt +from .actions import ( + Flair, + get_flair, + string_to_dt, + submission_is_older, +) + + +class TestActions(unittest.TestCase): + def setUp(self) -> None: + return super().setUp() + + def tearDown(self) -> None: + return super().tearDown() + + def test_get_flair(self) -> None: + solved = get_flair("Solved") + self.assertEqual(solved, Flair.SOLVED) + abandoned = get_flair('Abandoned') + self.assertEqual(abandoned, Flair.ABANDONED) + uknown = get_flair('Uknown') + self.assertEqual(uknown, Flair.UKNOWN) + uknown = get_flair('fsdafsd') + self.assertEqual(uknown, Flair.UKNOWN) + + def test_string_to_dt(self) -> None: + datetime = dt.datetime.now() + string_dt = str(datetime) + back_to_dt = string_to_dt(string_dt) + self.assertEqual(datetime, back_to_dt) + + def test_submission_is_older(self) -> None: + max_days = 7 + today = dt.datetime.now() + + post_made = today - dt.timedelta(days=3) + result = submission_is_older(post_made.date(), max_days) + self.assertFalse(result) + + post_made = today - dt.timedelta(days=max_days) + result = submission_is_older(post_made.date(), max_days) + self.assertFalse(result) + + post_made = today - dt.timedelta(days=(max_days + 1)) + result = submission_is_older(post_made.date(), max_days) + self.assertTrue(result) diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..fa81d91 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.14-slim + +WORKDIR /app + +# Copy application files +COPY Bot ./Bot + +RUN mkdir -p /app/config + +# Install dependencies +RUN pip install --no-cache-dir praw + +ENV PYTHONUNBUFFERED=1 + +# Run the script +CMD ["python", "Bot/main.py"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c2b4cce --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 hor00s + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..997cb63 --- /dev/null +++ b/config/config.json @@ -0,0 +1,11 @@ +{ + "client_id": "", + "client_secret": "", + "user_agent": "", + "username": "", + "password": "", + "sub_name": "", + "max_days": "180", + "max_posts": "180", + "sleep_minutes": "5" +} \ No newline at end of file