diff --git a/bambulabs_api/camera_client.py b/bambulabs_api/camera_client.py index 8e93067..0244bfc 100644 --- a/bambulabs_api/camera_client.py +++ b/bambulabs_api/camera_client.py @@ -4,11 +4,12 @@ import struct import socket import ssl -import logging from threading import Thread import time +from bambulabs_api.logger import logger + __all__ = ["PrinterCamera"] @@ -62,7 +63,7 @@ def get_frame(self): return encoded_image def retriever(self): - print("Starting camera thread.") + logger.info("Starting camera thread.") auth_data = bytearray() connect_attempts = 0 @@ -97,7 +98,7 @@ def retriever(self): connect_attempts += 1 sslSock = ctx.wrap_socket(sock, server_hostname=self.__hostname) # noqa - logging.info("Attempting to connect...") + logger.info("Attempting to connect...") sslSock.write(auth_data) img = None payload_size = 0 @@ -105,9 +106,9 @@ def retriever(self): status = sslSock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if status != 0: - logging.warning(f"Socket error: {status}") # noqa # pylint: disable=logging-fstring-interpolation + logger.warning(f"Socket error: {status}") # noqa # pylint: disable=logging-fstring-interpolation except socket.error as e: # noqa - logging.warning(f"Error in socket: {e}") # noqa # pylint: disable=logging-fstring-interpolation + logger.warning(f"Error in socket: {e}") # noqa # pylint: disable=logging-fstring-interpolation continue sslSock.setblocking(False) @@ -115,7 +116,7 @@ def retriever(self): while self.alive: try: - logging.debug("Reading chunk...") + logger.debug("Reading chunk...") dr = sslSock.recv(read_chunk_size) except ssl.SSLWantReadError: @@ -123,14 +124,14 @@ def retriever(self): continue except Exception as e: # noqa # pylint: disable=broad-exception-caught - logging.error(f"Exception. Type: {type(e)} Args: {e}") # noqa # pylint: disable=logging-fstring-interpolation + logger.error(f"Exception. Type: {type(e)} Args: {e}") # noqa # pylint: disable=logging-fstring-interpolation time.sleep(1) break - logging.debug(f"Read chunk {len(dr)}") # noqa # pylint: disable=logging-fstring-interpolation + logger.debug(f"Read chunk {len(dr)}") # noqa # pylint: disable=logging-fstring-interpolation if img is not None and len(dr) > 0: - logging.debug("Appending to Image") + logger.debug("Appending to Image") img += dr if len(img) > payload_size: img = None @@ -144,7 +145,7 @@ def retriever(self): img = None elif len(dr) == 16: - logging.debug("Got header") + logger.debug("Got header") connect_attempts = 0 img = bytearray() payload_size = int.from_bytes(dr[0:3], @@ -152,18 +153,18 @@ def retriever(self): elif len(dr) == 0: time.sleep(5) - logging.error("Wrong access code or IP") + logger.error("Wrong access code or IP") break else: - logging.error("something bad happened") + logger.error("something bad happened") time.sleep(1) break except Exception as e: # noqa # pylint: disable=broad-exception-caught - logging.error(f"Error occurred: {e}") # noqa # pylint: disable=logging-fstring-interpolation + logger.error(f"Error occurred: {e}") # noqa # pylint: disable=logging-fstring-interpolation continue finally: time.sleep(5) - logging.info("Reconnecting...") + logger.info("Reconnecting...") continue diff --git a/bambulabs_api/ftp_client.py b/bambulabs_api/ftp_client.py index e1916bd..9eb3fa6 100644 --- a/bambulabs_api/ftp_client.py +++ b/bambulabs_api/ftp_client.py @@ -6,11 +6,12 @@ import ssl from PIL import Image -import logging from typing import Any, BinaryIO from PIL.ImageFile import ImageFile +from bambulabs_api.logger import logger + class ImplicitFTP_TLS(ftplib.FTP_TLS): """FTP_TLS subclass that automatically wraps sockets in SSL to support implicit FTPS.""" # noqa @@ -82,26 +83,26 @@ def connect_and_run(func): func (function): the function to be decorated """ # noqa - def wrapper(self, *args, **kwargs) -> Any: - logging.info("Connecting to FTP server...") + def wrapper(self: 'PrinterFTPClient', *args, **kwargs) -> Any: + logger.info("Connecting to FTP server...") self.ftps.connect(host=self.server_ip, port=self.port) self.ftps.login(self.user, self.access_code) - logging.info("Connected to FTP server") - logging.info(self.ftps.prot_p()) + logger.info("Connected to FTP server") + logger.info(self.ftps.prot_p()) try: return func(self, *args, **kwargs) # type: ignore except Exception as e: # noqa # pylint: disable=broad-exception-caught - logging.error(f"Failed to execute function: {e}") # noqa # pylint: disable=logging-fstring-interpolation + logger.error(f"Failed to execute function: {e}") # noqa # pylint: disable=logging-fstring-interpolation finally: self.ftps.close() - logging.info("Connection to FTP server closed") + logger.info("Connection to FTP server closed") return wrapper @connect_and_run def upload_file(self, file: BinaryIO, file_path: str) -> str: return self.ftps.storbinary(f'STOR {file_path}', file, blocksize=32768, - callback=lambda x: logging.debug(f"Uploaded {x} bytes")) # noqa # pylint: disable=logging-fstring-interpolation + callback=lambda x: logger.debug(f"Uploaded {x} bytes")) # noqa # pylint: disable=logging-fstring-interpolation @connect_and_run def list_directory(self, path: str | None = None) -> tuple[str, list[str]]: @@ -199,7 +200,7 @@ def download_file( @connect_and_run def delete_file(self, file_path: str) -> str: - logging.info(f"Deleting file: {file_path}") # noqa # pylint: disable=logging-fstring-interpolation + logger.info(f"Deleting file: {file_path}") # noqa # pylint: disable=logging-fstring-interpolation return self.ftps.delete(file_path) def close(self) -> None: diff --git a/bambulabs_api/logger.py b/bambulabs_api/logger.py new file mode 100644 index 0000000..7e54307 --- /dev/null +++ b/bambulabs_api/logger.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger("bambulabs_api") diff --git a/bambulabs_api/mqtt_client.py b/bambulabs_api/mqtt_client.py index ffc49f6..d2b06bf 100644 --- a/bambulabs_api/mqtt_client.py +++ b/bambulabs_api/mqtt_client.py @@ -1,7 +1,7 @@ __all__ = ["PrinterMQTTClient"] import json -import logging +from bambulabs_api.logger import logger import ssl import datetime from typing import Any, Callable, Union @@ -125,7 +125,7 @@ def __init__( self._last_update: int = 0 self.command_topic = f"device/{printer_serial}/request" - logging.info(f"{self.command_topic}") # noqa: E501 # pylint: disable=logging-fstring-interpolation + logger.info(f"{self.command_topic}") # noqa: E501 # pylint: disable=logging-fstring-interpolation self._data: dict[Any, Any] = {} self.ams_hub: AMSHub = AMSHub() @@ -154,7 +154,7 @@ def wrapper( *args: list[Any], **kwargs: dict[str, Any]) -> Any: if not self.ready(): - logging.error("Printer Values Not Available Yet") + logger.error("Printer Values Not Available Yet") if self.strict: raise Exception("Printer not found") @@ -172,8 +172,8 @@ def _on_disconnect( reason_code: paho.mqtt.reasoncodes.ReasonCode, properties: Union[paho.mqtt.properties.Properties, None], ) -> None: - logging.info(f"Client Disconnected: {client} {userdata} " - f"{disconnect_flags} {reason_code} {properties}") + logger.info(f"Client Disconnected: {client} {userdata} " + f"{disconnect_flags} {reason_code} {properties}") self.on_disconnect_handler( self, client, @@ -199,7 +199,7 @@ def manual_update(self, doc: dict[str, Any]) -> None: if k not in self._data: self._data[k] = {} self._data[k] |= v - logging.debug(self._data) + logger.debug(self._data) firmware_version = self.firmware_version() if firmware_version is not None: @@ -228,9 +228,9 @@ def _on_connect( rc : int The connection result """ - logging.info(f"Connection result code: {rc}") + logger.info(f"Connection result code: {rc}") if rc == 0 or not rc.is_failure: - logging.info("Connected successfully") + logger.info("Connected successfully") client.subscribe(f"device/{self._printer_serial}/report") if self.pushall_aggressive: self._client.publish( @@ -240,9 +240,9 @@ def _on_connect( "info": {"command": "get_version"}, "upgrade": {"command": "get_history"}, })) - logging.info("Connection Handshake Completed") + logger.info("Connection Handshake Completed") else: - logging.warning(f"Connection failed with result code {rc}") + logger.warning(f"Connection failed with result code {rc}") self.on_connect_handler( self, @@ -429,11 +429,11 @@ def __publish_command(self, payload: dict[Any, Any]) -> bool: payload (dict[Any, Any]): command to send to the printer """ if self._client.is_connected() is False: - logging.error("Not connected to the MQTT server") + logger.error("Not connected to the MQTT server") return False command = self._client.publish(self.command_topic, json.dumps(payload)) - logging.debug(f"Published command: {payload}") + logger.debug(f"Published command: {payload}") command.wait_for_publish() return command.is_published() @@ -659,7 +659,7 @@ def set_bed_temperature( return self.__send_gcode_line(f"M140 S{temperature}\n") else: if temperature < 40 and not override: - logging.warning( + logger.warning( "Attempting to set low bed temperature not recommended. " "Set override flag to true to if you're sure you want to " f"run M190 S{temperature};" @@ -809,7 +809,7 @@ def get_access_code(self) -> str: if code is None: return self._access elif code != self._access: - logging.error( + logger.error( f"Unexpected state: our access code is {self._access}; " f"reported is {code}") return code @@ -869,7 +869,7 @@ def set_nozzle_temperature( return self.__send_gcode_line(f"M104 S{temperature}\n") else: if temperature < 60 and not override: - logging.warning( + logger.warning( "Attempting to set low bed temperature not recommended. " "Set override flag to true to if you're sure you want to " f"run M109 S{temperature};" @@ -1161,7 +1161,7 @@ def upgrade_firmware(self, override: bool = False) -> bool: new_firmware = self.new_printer_firmware() if new_firmware is not None: if new_firmware >= "1.08" and not override: - logging.warning( + logger.warning( f"You are about to upgrade to {new_firmware}." "Firmware above 1.08 may result in api incompatibility" ) @@ -1189,7 +1189,7 @@ def downgrade_firmware(self, firmware_version: str) -> bool: """ firmware_history = self.get_firmware_history() if not firmware_history: - logging.warning("Firmware history not up to date") + logger.warning("Firmware history not up to date") return False firmware = next( (firmware["firmware"] for firmware in firmware_history @@ -1197,7 +1197,7 @@ def downgrade_firmware(self, firmware_version: str) -> bool: "version", None) == firmware_version), None) if firmware is None: - logging.warning( + logger.warning( f"Firmware {firmware_version} not found in listed firmware") return False @@ -1308,7 +1308,7 @@ def reboot(self) -> bool: Returns: bool: if printer has been rebooted correctly """ - logging.warning("Sending reboot command!") + logger.warning("Sending reboot command!") return self.__publish_command( { "system": {