WIP implimentig new files widget, updated settings, broke keybindings
This commit is contained in:
3
src/versions/solarfm-0.0.1/solarfm/utils/__init__.py
Normal file
3
src/versions/solarfm-0.0.1/solarfm/utils/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
Utils module
|
||||
"""
|
||||
@@ -0,0 +1,22 @@
|
||||
# Python imports
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class EndpointRegistry():
|
||||
def __init__(self):
|
||||
self._endpoints = {}
|
||||
|
||||
def register(self, rule, **options):
|
||||
def decorator(f):
|
||||
self._endpoints[rule] = f
|
||||
return f
|
||||
|
||||
return decorator
|
||||
|
||||
def get_endpoints(self):
|
||||
return self._endpoints
|
||||
48
src/versions/solarfm-0.0.1/solarfm/utils/event_system.py
Normal file
48
src/versions/solarfm-0.0.1/solarfm/utils/event_system.py
Normal file
@@ -0,0 +1,48 @@
|
||||
# Python imports
|
||||
from collections import defaultdict
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class EventSystem:
|
||||
""" Create event system. """
|
||||
|
||||
def __init__(self):
|
||||
self.subscribers = defaultdict(list)
|
||||
|
||||
|
||||
def subscribe(self, event_type, fn):
|
||||
self.subscribers[event_type].append(fn)
|
||||
|
||||
def unsubscribe(self, event_type, fn):
|
||||
self.subscribers[event_type].remove(fn)
|
||||
|
||||
def unsubscribe_all(self, event_type):
|
||||
self.subscribers.pop(event_type, None)
|
||||
|
||||
def emit(self, event_type, data = None):
|
||||
if event_type in self.subscribers:
|
||||
for fn in self.subscribers[event_type]:
|
||||
if data:
|
||||
if hasattr(data, '__iter__') and not type(data) is str:
|
||||
fn(*data)
|
||||
else:
|
||||
fn(data)
|
||||
else:
|
||||
fn()
|
||||
|
||||
def emit_and_await(self, event_type, data = None):
|
||||
""" NOTE: Should be used when signal has only one listener and vis-a-vis """
|
||||
if event_type in self.subscribers:
|
||||
for fn in self.subscribers[event_type]:
|
||||
if data:
|
||||
if hasattr(data, '__iter__') and not type(data) is str:
|
||||
return fn(*data)
|
||||
else:
|
||||
return fn(data)
|
||||
else:
|
||||
return fn()
|
||||
108
src/versions/solarfm-0.0.1/solarfm/utils/ipc_server.py
Normal file
108
src/versions/solarfm-0.0.1/solarfm/utils/ipc_server.py
Normal file
@@ -0,0 +1,108 @@
|
||||
# Python imports
|
||||
import os
|
||||
import time
|
||||
from multiprocessing.connection import Client
|
||||
from multiprocessing.connection import Listener
|
||||
|
||||
# Lib imports
|
||||
from gi.repository import GLib
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class IPCServer:
|
||||
""" Create a listener so that other SolarFM instances send requests back to existing instance. """
|
||||
def __init__(self, ipc_address: str = '127.0.0.1', conn_type: str = "socket"):
|
||||
self.is_ipc_alive = False
|
||||
self._ipc_port = 4848
|
||||
self._ipc_address = ipc_address
|
||||
self._conn_type = conn_type
|
||||
self._ipc_authkey = b'' + bytes(f'{app_name}-ipc', 'utf-8')
|
||||
self._ipc_timeout = 15.0
|
||||
|
||||
if conn_type == "socket":
|
||||
self._ipc_address = f'/tmp/{app_name}-ipc.sock'
|
||||
elif conn_type == "full_network":
|
||||
self._ipc_address = '0.0.0.0'
|
||||
elif conn_type == "full_network_unsecured":
|
||||
self._ipc_authkey = None
|
||||
self._ipc_address = '0.0.0.0'
|
||||
elif conn_type == "local_network_unsecured":
|
||||
self._ipc_authkey = None
|
||||
|
||||
self._subscribe_to_events()
|
||||
|
||||
|
||||
def _subscribe_to_events(self):
|
||||
event_system.subscribe("post_file_to_ipc", self.send_ipc_message)
|
||||
|
||||
def create_ipc_listener(self) -> None:
|
||||
if self._conn_type == "socket":
|
||||
if os.path.exists(self._ipc_address) and settings.is_dirty_start():
|
||||
os.unlink(self._ipc_address)
|
||||
|
||||
listener = Listener(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
|
||||
elif "unsecured" not in self._conn_type:
|
||||
listener = Listener((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
|
||||
else:
|
||||
listener = Listener((self._ipc_address, self._ipc_port))
|
||||
|
||||
|
||||
self.is_ipc_alive = True
|
||||
self._run_ipc_loop(listener)
|
||||
|
||||
@daemon_threaded
|
||||
def _run_ipc_loop(self, listener) -> None:
|
||||
while True:
|
||||
try:
|
||||
conn = listener.accept()
|
||||
start_time = time.perf_counter()
|
||||
GLib.idle_add(self._handle_ipc_message, *(conn, start_time,))
|
||||
except Exception as e:
|
||||
...
|
||||
|
||||
listener.close()
|
||||
|
||||
def _handle_ipc_message(self, conn, start_time) -> None:
|
||||
while True:
|
||||
msg = conn.recv()
|
||||
if settings.is_debug():
|
||||
print(msg)
|
||||
|
||||
if "FILE|" in msg:
|
||||
file = msg.split("FILE|")[1].strip()
|
||||
if file:
|
||||
event_system.emit("handle_file_from_ipc", file)
|
||||
|
||||
conn.close()
|
||||
break
|
||||
|
||||
|
||||
if msg in ['close connection', 'close server']:
|
||||
conn.close()
|
||||
break
|
||||
|
||||
# NOTE: Not perfect but insures we don't lock up the connection for too long.
|
||||
end_time = time.perf_counter()
|
||||
if (end_time - start_time) > self._ipc_timeout:
|
||||
conn.close()
|
||||
break
|
||||
|
||||
|
||||
def send_ipc_message(self, message: str = "Empty Data...") -> None:
|
||||
try:
|
||||
if self._conn_type == "socket":
|
||||
conn = Client(address=self._ipc_address, family="AF_UNIX", authkey=self._ipc_authkey)
|
||||
elif "unsecured" not in self._conn_type:
|
||||
conn = Client((self._ipc_address, self._ipc_port), authkey=self._ipc_authkey)
|
||||
else:
|
||||
conn = Client((self._ipc_address, self._ipc_port))
|
||||
|
||||
conn.send(message)
|
||||
conn.close()
|
||||
except ConnectionRefusedError as e:
|
||||
print("Connection refused...")
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
138
src/versions/solarfm-0.0.1/solarfm/utils/keybindings.py
Normal file
138
src/versions/solarfm-0.0.1/solarfm/utils/keybindings.py
Normal file
@@ -0,0 +1,138 @@
|
||||
# Python imports
|
||||
import re
|
||||
|
||||
# Gtk imports
|
||||
import gi
|
||||
gi.require_version('Gdk', '3.0')
|
||||
from gi.repository import Gdk
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
def err(log = ""):
|
||||
"""Print an error message"""
|
||||
print(log)
|
||||
|
||||
|
||||
class KeymapError(Exception):
|
||||
"""Custom exception for errors in keybinding configurations"""
|
||||
|
||||
MODIFIER = re.compile('<([^<]+)>')
|
||||
class Keybindings:
|
||||
"""Class to handle loading and lookup of Terminator keybindings"""
|
||||
|
||||
modifiers = {
|
||||
'ctrl': Gdk.ModifierType.CONTROL_MASK,
|
||||
'control': Gdk.ModifierType.CONTROL_MASK,
|
||||
'primary': Gdk.ModifierType.CONTROL_MASK,
|
||||
'shift': Gdk.ModifierType.SHIFT_MASK,
|
||||
'alt': Gdk.ModifierType.MOD1_MASK,
|
||||
'super': Gdk.ModifierType.SUPER_MASK,
|
||||
'hyper': Gdk.ModifierType.HYPER_MASK,
|
||||
'mod2': Gdk.ModifierType.MOD2_MASK
|
||||
}
|
||||
|
||||
empty = {}
|
||||
keys = None
|
||||
_masks = None
|
||||
_lookup = None
|
||||
|
||||
def __init__(self):
|
||||
self.keymap = Gdk.Keymap.get_default()
|
||||
self.configure({})
|
||||
|
||||
def print_keys(self):
|
||||
print(self.keys)
|
||||
|
||||
def append_bindings(self, combos):
|
||||
"""Accept new binding(s) and reload"""
|
||||
for item in combos:
|
||||
method, keys = item.split(":")
|
||||
self.keys[method] = keys
|
||||
|
||||
self.reload()
|
||||
|
||||
def configure(self, bindings):
|
||||
"""Accept new bindings and reconfigure with them"""
|
||||
self.keys = bindings
|
||||
self.reload()
|
||||
|
||||
def reload(self):
|
||||
"""Parse bindings and mangle into an appropriate form"""
|
||||
self._lookup = {}
|
||||
self._masks = 0
|
||||
|
||||
for action, bindings in list(self.keys.items()):
|
||||
if isinstance(bindings, list):
|
||||
bindings = (*bindings,)
|
||||
elif not isinstance(bindings, tuple):
|
||||
bindings = (bindings,)
|
||||
|
||||
|
||||
for binding in bindings:
|
||||
if not binding or binding == "None":
|
||||
continue
|
||||
|
||||
try:
|
||||
keyval, mask = self._parsebinding(binding)
|
||||
# Does much the same, but with poorer error handling.
|
||||
#keyval, mask = Gtk.accelerator_parse(binding)
|
||||
except KeymapError as e:
|
||||
err ("keybinding reload failed to parse binding '%s': %s" % (binding, e))
|
||||
else:
|
||||
if mask & Gdk.ModifierType.SHIFT_MASK:
|
||||
if keyval == Gdk.KEY_Tab:
|
||||
keyval = Gdk.KEY_ISO_Left_Tab
|
||||
mask &= ~Gdk.ModifierType.SHIFT_MASK
|
||||
else:
|
||||
keyvals = Gdk.keyval_convert_case(keyval)
|
||||
if keyvals[0] != keyvals[1]:
|
||||
keyval = keyvals[1]
|
||||
mask &= ~Gdk.ModifierType.SHIFT_MASK
|
||||
else:
|
||||
keyval = Gdk.keyval_to_lower(keyval)
|
||||
|
||||
self._lookup.setdefault(mask, {})
|
||||
self._lookup[mask][keyval] = action
|
||||
self._masks |= mask
|
||||
|
||||
def _parsebinding(self, binding):
|
||||
"""Parse an individual binding using Gtk's binding function"""
|
||||
mask = 0
|
||||
modifiers = re.findall(MODIFIER, binding)
|
||||
|
||||
if modifiers:
|
||||
for modifier in modifiers:
|
||||
mask |= self._lookup_modifier(modifier)
|
||||
|
||||
key = re.sub(MODIFIER, '', binding)
|
||||
if key == '':
|
||||
raise KeymapError('No key found!')
|
||||
|
||||
keyval = Gdk.keyval_from_name(key)
|
||||
|
||||
if keyval == 0:
|
||||
raise KeymapError("Key '%s' is unrecognised..." % key)
|
||||
return (keyval, mask)
|
||||
|
||||
def _lookup_modifier(self, modifier):
|
||||
"""Map modifier names to gtk values"""
|
||||
try:
|
||||
return self.modifiers[modifier.lower()]
|
||||
except KeyError:
|
||||
raise KeymapError("Unhandled modifier '<%s>'" % modifier)
|
||||
|
||||
def lookup(self, event):
|
||||
"""Translate a keyboard event into a mapped key"""
|
||||
try:
|
||||
_found, keyval, _egp, _lvl, consumed = self.keymap.translate_keyboard_state(
|
||||
event.hardware_keycode,
|
||||
Gdk.ModifierType(event.get_state() & ~Gdk.ModifierType.LOCK_MASK),
|
||||
event.group)
|
||||
except TypeError:
|
||||
err ("Keybinding lookup failed to translate keyboard event: %s" % dir(event))
|
||||
return None
|
||||
|
||||
mask = (event.get_state() & ~consumed) & self._masks
|
||||
return self._lookup.get(mask, self.empty).get(keyval, None)
|
||||
57
src/versions/solarfm-0.0.1/solarfm/utils/logger.py
Normal file
57
src/versions/solarfm-0.0.1/solarfm/utils/logger.py
Normal file
@@ -0,0 +1,57 @@
|
||||
# Python imports
|
||||
import os
|
||||
import logging
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
class Logger:
|
||||
"""
|
||||
Create a new logging object and return it.
|
||||
:note:
|
||||
NOSET # Don't know the actual log level of this... (defaulting or literally none?)
|
||||
Log Levels (From least to most)
|
||||
Type Value
|
||||
CRITICAL 50
|
||||
ERROR 40
|
||||
WARNING 30
|
||||
INFO 20
|
||||
DEBUG 10
|
||||
:param loggerName: Sets the name of the logger object. (Used in log lines)
|
||||
:param createFile: Whether we create a log file or just pump to terminal
|
||||
|
||||
:return: the logging object we created
|
||||
"""
|
||||
|
||||
def __init__(self, config_path: str, _ch_log_lvl = logging.CRITICAL, _fh_log_lvl = logging.INFO):
|
||||
self._CONFIG_PATH = config_path
|
||||
self.global_lvl = logging.DEBUG # Keep this at highest so that handlers can filter to their desired levels
|
||||
self.ch_log_lvl = _ch_log_lvl # Prety much the only one we ever change
|
||||
self.fh_log_lvl = _fh_log_lvl
|
||||
|
||||
def get_logger(self, loggerName: str = "NO_LOGGER_NAME_PASSED", createFile: bool = True) -> logging.Logger:
|
||||
log = logging.getLogger(loggerName)
|
||||
log.setLevel(self.global_lvl)
|
||||
|
||||
# Set our log output styles
|
||||
fFormatter = logging.Formatter('[%(asctime)s] %(pathname)s:%(lineno)d %(levelname)s - %(message)s', '%m-%d %H:%M:%S')
|
||||
cFormatter = logging.Formatter('%(pathname)s:%(lineno)d] %(levelname)s - %(message)s')
|
||||
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(level=self.ch_log_lvl)
|
||||
ch.setFormatter(cFormatter)
|
||||
log.addHandler(ch)
|
||||
|
||||
if createFile:
|
||||
folder = self._CONFIG_PATH
|
||||
file = f"{folder}/application.log"
|
||||
|
||||
if not os.path.exists(folder):
|
||||
os.mkdir(folder)
|
||||
|
||||
fh = logging.FileHandler(file)
|
||||
fh.setLevel(level=self.fh_log_lvl)
|
||||
fh.setFormatter(fFormatter)
|
||||
log.addHandler(fh)
|
||||
|
||||
return log
|
||||
@@ -0,0 +1,4 @@
|
||||
"""
|
||||
Settings module
|
||||
"""
|
||||
from .settings import Settings
|
||||
159
src/versions/solarfm-0.0.1/solarfm/utils/settings/settings.py
Normal file
159
src/versions/solarfm-0.0.1/solarfm/utils/settings/settings.py
Normal file
@@ -0,0 +1,159 @@
|
||||
# Python imports
|
||||
import os
|
||||
import inspect
|
||||
import json
|
||||
from os import path
|
||||
|
||||
# Gtk imports
|
||||
import gi
|
||||
gi.require_version('Gtk', '3.0')
|
||||
from gi.repository import Gtk
|
||||
from gi.repository import GLib
|
||||
|
||||
# Application imports
|
||||
from .start_check_mixin import StartCheckMixin
|
||||
|
||||
|
||||
|
||||
class Settings(StartCheckMixin):
|
||||
def __init__(self):
|
||||
self._SCRIPT_PTH = os.path.dirname(os.path.realpath(__file__))
|
||||
self._USER_HOME = path.expanduser('~')
|
||||
self._USR_PATH = f"/usr/share/{app_name.lower()}"
|
||||
|
||||
self._USR_CONFIG_FILE = f"{self._USR_PATH}/settings.json"
|
||||
self._HOME_CONFIG_PATH = f"{self._USER_HOME}/.config/{app_name.lower()}"
|
||||
self._PLUGINS_PATH = f"{self._HOME_CONFIG_PATH}/plugins"
|
||||
self._DEFAULT_ICONS = f"{self._HOME_CONFIG_PATH}/icons"
|
||||
self._CONFIG_FILE = f"{self._HOME_CONFIG_PATH}/settings.json"
|
||||
self._GLADE_FILE = f"{self._HOME_CONFIG_PATH}/Main_Window.glade"
|
||||
self._CSS_FILE = f"{self._HOME_CONFIG_PATH}/stylesheet.css"
|
||||
self._KEY_BINDINGS_FILE = f"{self._HOME_CONFIG_PATH}/key-bindings.json"
|
||||
self._PID_FILE = f"{self._HOME_CONFIG_PATH}/{app_name.lower()}.pid"
|
||||
self._WINDOW_ICON = f"{self._DEFAULT_ICONS}/icons/{app_name.lower()}.png"
|
||||
|
||||
if not os.path.exists(self._HOME_CONFIG_PATH):
|
||||
os.mkdir(self._HOME_CONFIG_PATH)
|
||||
if not os.path.exists(self._PLUGINS_PATH):
|
||||
os.mkdir(self._PLUGINS_PATH)
|
||||
|
||||
if not os.path.exists(self._CONFIG_FILE):
|
||||
import shutil
|
||||
try:
|
||||
shutil.copyfile(self._USR_CONFIG_FILE, self._CONFIG_FILE)
|
||||
except Exception as e:
|
||||
raise
|
||||
|
||||
if not os.path.exists(self._DEFAULT_ICONS):
|
||||
self._DEFAULT_ICONS = f"{self._USR_PATH}/icons"
|
||||
if not os.path.exists(self._GLADE_FILE):
|
||||
self._GLADE_FILE = f"{self._USR_PATH}/Main_Window.glade"
|
||||
if not os.path.exists(self._KEY_BINDINGS_FILE):
|
||||
self._KEY_BINDINGS_FILE = f"{self._USR_PATH}/key-bindings.json"
|
||||
if not os.path.exists(self._CSS_FILE):
|
||||
self._CSS_FILE = f"{self._USR_PATH}/stylesheet.css"
|
||||
if not os.path.exists(self._WINDOW_ICON):
|
||||
self._WINDOW_ICON = f"{self._USR_PATH}/icons/{app_name.lower()}.png"
|
||||
|
||||
|
||||
self._UI_WIDEGTS_PATH = f"{self._USR_PATH}/ui_widgets"
|
||||
self._CONTEXT_MENU = f"{self._USR_PATH}/contexct_menu.json"
|
||||
self._TRASH_FILES_PATH = f"{GLib.get_user_data_dir()}/Trash/files"
|
||||
self._TRASH_INFO_PATH = f"{GLib.get_user_data_dir()}/Trash/info"
|
||||
self._ICON_THEME = Gtk.IconTheme.get_default()
|
||||
|
||||
if not os.path.exists(self._CONTEXT_MENU):
|
||||
self._CONTEXT_MENU = f"{self._USR_PATH}/contexct_menu.json"
|
||||
|
||||
with open(self._KEY_BINDINGS_FILE) as file:
|
||||
bindings = json.load(file)["keybindings"]
|
||||
keybindings.configure(bindings)
|
||||
|
||||
with open(self._CONTEXT_MENU) as file:
|
||||
self._context_menu_data = json.load(file)
|
||||
|
||||
self._main_window = None
|
||||
self._builder = None
|
||||
|
||||
self._trace_debug = False
|
||||
self._debug = False
|
||||
self._dirty_start = False
|
||||
|
||||
self.load_settings()
|
||||
|
||||
|
||||
def register_signals_to_builder(self, classes=None):
|
||||
handlers = {}
|
||||
|
||||
for c in classes:
|
||||
methods = None
|
||||
try:
|
||||
methods = inspect.getmembers(c, predicate=inspect.ismethod)
|
||||
handlers.update(methods)
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
|
||||
self._builder.connect_signals(handlers)
|
||||
|
||||
|
||||
def get_monitor_data(self) -> list:
|
||||
screen = self._main_window.get_screen()
|
||||
monitors = []
|
||||
for m in range(screen.get_n_monitors()):
|
||||
monitors.append(screen.get_monitor_geometry(m))
|
||||
print("{}x{}+{}+{}".format(monitor.width, monitor.height, monitor.x, monitor.y))
|
||||
|
||||
return monitors
|
||||
|
||||
def set_builder(self, builder) -> any: self._builder = builder
|
||||
def set_main_window(self, window): self._main_window = window
|
||||
|
||||
def get_main_window(self) -> Gtk.ApplicationWindow: return self._main_window
|
||||
def get_builder(self) -> Gtk.Builder: return self._builder
|
||||
def get_glade_file(self) -> str: return self._GLADE_FILE
|
||||
def get_icon_theme(self) -> str: return self._ICON_THEME
|
||||
def get_css_file(self) -> str: return self._CSS_FILE
|
||||
def get_home_config_path(self) -> str: return self._HOME_CONFIG_PATH
|
||||
def get_window_icon(self) -> str: return self._WINDOW_ICON
|
||||
|
||||
def get_context_menu_data(self) -> str: return self._context_menu_data
|
||||
def get_ui_widgets_path(self) -> str: return self._UI_WIDEGTS_PATH
|
||||
def get_trash_files_path(self) -> str: return self._TRASH_FILES_PATH
|
||||
def get_trash_info_path(self) -> str: return self._TRASH_INFO_PATH
|
||||
def get_plugins_path(self) -> str: return self._PLUGINS_PATH
|
||||
|
||||
# Filter returns
|
||||
def get_office_filter(self) -> tuple: return tuple(self._settings["filters"]["office"])
|
||||
def get_vids_filter(self) -> tuple: return tuple(self._settings["filters"]["videos"])
|
||||
def get_text_filter(self) -> tuple: return tuple(self._settings["filters"]["text"])
|
||||
def get_music_filter(self) -> tuple: return tuple(self._settings["filters"]["music"])
|
||||
def get_images_filter(self) -> tuple: return tuple(self._settings["filters"]["images"])
|
||||
def get_pdf_filter(self) -> tuple: return tuple(self._settings["filters"]["pdf"])
|
||||
|
||||
def get_success_color(self) -> str: return self._theming["success_color"]
|
||||
def get_warning_color(self) -> str: return self._theming["warning_color"]
|
||||
def get_error_color(self) -> str: return self._theming["error_color"]
|
||||
|
||||
def is_trace_debug(self) -> bool: return self._trace_debug
|
||||
def is_debug(self) -> bool: return self._debug
|
||||
|
||||
def get_ch_log_lvl(self) -> str: return self._settings["debugging"]["ch_log_lvl"]
|
||||
def get_fh_log_lvl(self) -> str: return self._settings["debugging"]["fh_log_lvl"]
|
||||
|
||||
|
||||
def set_trace_debug(self, trace_debug: bool):
|
||||
self._trace_debug = trace_debug
|
||||
|
||||
def set_debug(self, debug: bool):
|
||||
self._debug = debug
|
||||
|
||||
|
||||
def load_settings(self):
|
||||
with open(self._CONFIG_FILE) as f:
|
||||
self._settings = json.load(f)
|
||||
self._config = self._settings["config"]
|
||||
self._theming = self._settings["theming"]
|
||||
|
||||
def save_settings(self):
|
||||
with open(self._CONFIG_FILE, 'w') as outfile:
|
||||
json.dump(self._settings, outfile, separators=(',', ':'), indent=4)
|
||||
@@ -0,0 +1,50 @@
|
||||
# Python imports
|
||||
import os
|
||||
import json
|
||||
import inspect
|
||||
|
||||
# Lib imports
|
||||
|
||||
# Application imports
|
||||
|
||||
|
||||
|
||||
|
||||
class StartCheckMixin:
|
||||
def is_dirty_start(self) -> bool: return self._dirty_start
|
||||
def clear_pid(self): self._clean_pid()
|
||||
|
||||
def do_dirty_start_check(self):
|
||||
if not os.path.exists(self._PID_FILE):
|
||||
self._write_new_pid()
|
||||
else:
|
||||
with open(self._PID_FILE, "r") as _pid:
|
||||
pid = _pid.readline().strip()
|
||||
if pid not in ("", None):
|
||||
self._check_alive_status(int(pid))
|
||||
else:
|
||||
self._write_new_pid()
|
||||
|
||||
""" Check For the existence of a unix pid. """
|
||||
def _check_alive_status(self, pid):
|
||||
print(f"PID Found: {pid}")
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
except OSError:
|
||||
print(f"{app_name} is starting dirty...")
|
||||
self._dirty_start = True
|
||||
self._write_new_pid()
|
||||
return
|
||||
|
||||
print("PID is alive... Let downstream errors (sans debug args) handle app closure propigation.")
|
||||
|
||||
def _write_new_pid(self):
|
||||
pid = os.getpid()
|
||||
self._write_pid(pid)
|
||||
|
||||
def _clean_pid(self):
|
||||
os.unlink(self._PID_FILE)
|
||||
|
||||
def _write_pid(self, pid):
|
||||
with open(self._PID_FILE, "w") as _pid:
|
||||
_pid.write(f"{pid}")
|
||||
Reference in New Issue
Block a user