I am trying to build an observer for the system to watch for applications' close/open/minimized/activated events
I was able to build the below observer. However, I was stuck on how to connect it
class AbstractObserver(ABC):
""" The Observer interface declares the update method, used by subjects. """
@abstractmethod
def update(self, ) -> None:
""" Receive update from subject. """
pass
class AbstractObserved(ABC):
""" The Subject interface declares a set of methods for managing subscribers. """
_state = None
_observers = {}
@abstractmethod
def attach(self, id) -> None:
pass
def detach(self, id) -> None:
""" Detach an observer from the subject."""
print(f'Task{id} Detach From Observer.')
try:
self._observers.pop(id, None)
except ValueError:
print(f'Task{id} Not found in observers.')
pass
@abstractmethod
def notify(self, ids, state='create') -> None:
""" Notify all observers about an event."""
pass
class ApplicationObserver(AbstractObserver):
_tracker = None
_start = None
def update(self, ) -> None:
pass
class ObservedApplication(AbstractObserved):
""" The Subject interface declares a set of methods for managing subscribers. """
_state = None
_observers = {}
def attach(self, id) -> None:
""" Attach an observer to the subject. """
pass
def notify(self, ids: dict, state='create') -> None:
""" Notify all observers about an event."""
pass
I am trying to connect it to the generator results from psutil.process_iter()
function But, as a generator, there is always an extra step of converting it to a list causing me troubles in the connection
Aucun commentaire:
Enregistrer un commentaire