diff --git a/Makefile b/Makefile index 9825ab1..38d5915 100644 --- a/Makefile +++ b/Makefile @@ -101,7 +101,7 @@ define rsync_to_dest exit 1; \ fi - @rsync -avh ./config.json $(2)/version.py $(1)/*.py $(1)/lib --exclude=".*" --exclude='requirements.txt' --exclude='__pycache__' $(2) --delete --times --checksum + @rsync -avh ./config.json $(1)/version.py $(1)/*.py $(1)/lib --exclude=".*" --exclude='requirements.txt' --exclude='__pycache__' $(2) --delete --times --checksum endef ##@ Build Tools diff --git a/config.json b/config.json deleted file mode 100644 index 4882a8e..0000000 --- a/config.json +++ /dev/null @@ -1,89 +0,0 @@ -{ - "battery_voltage": 5.2, - "critical_battery_voltage": 6.6, - "cubesat_name": "PROVES-", - "current_draw": 240.5, - "debug": true, - "degraded_battery_voltage": 6.6, - "detumble_enable_x": true, - "detumble_enable_y": true, - "detumble_enable_z": true, - "heating": false, - "joke_reply": [ - "Your Mom", - "Your Mum", - "Your Face", - "not True lol", - "I have brought peace, freedom, justice, and security to my new empire! Your New Empire?" - ], - "jokes": [ - "Hey it is pretty cold up here, did someone forget to pay the electric bill?", - "sudo rf - rf*", - "Why did the astronaut break up with his girlfriend? He needed space.", - "Why did the sun go to school? To get a little brighter.", - "why is the mall called the mall? because instead of going to one store you go to them all", - "Alien detected. Blurring photo...", - "Wait it is all open source? Always has been... www.github.com/proveskit", - "What did 0 say to 1? You're a bit too much.", - "Pleiades - Orpheus has been recently acquired by the Onion News Network", - "This jokesat was brought to you by the Bronco Space Ministry of Labor and Job Placement", - "Catch you on the next pass!", - "Pleiades - Orpheus was not The Impostor", - "Sorry for messing with your long-exposure astrophoto!", - "Better buy a telescope. Wanna see me. Buy a telescope. Gonna be in space.", - "According to all known laws of aviation, there is no way bees should be able to fly...", - "You lost the game ", - "Bobby Tables is a good friend of mine", - "Why did the computer cross the road? To get a byte to eat!", - "Why are the astronauts not hungry when they got to space? They had a big launch.", - "Why did the computer get glasses? To improve its web sight!", - "What are computers favorite snacks? Chips!", - "Wait! I think I see a White 2019 Subaru Crosstrek 2.0i Premium", - "IS THAT A SUPRA?!", - "Finally escpaed the LA Traffic", - "My CubeSat is really good at jokes, but its delivery is always delayed.", - "exec order 66", - "I had a joke about UDP, but I am not sure if you'd get it.", - "I am not saying FSK modulation is the best way to send jokes, but at least it is never monotone!", - "I am sorry David, I am afrain I can not do that.", - "My memory is volatile like RAM, so it only makes sense that I forget things.", - "Imagine it gets stuck and just keeps repeating this joke every 2 mins", - "Check Engine: Error Code 404: Joke Not Found", - "CQ CQ KN6NAQ ... KN6NAT are you out there?", - "Woah is that the Launcher Orbiter?????", - "Everything in life is a spring if you think hard enough!" - ], - "last_battery_temp": 20.0, - "longest_allowable_sleep_time": 360, - "normal_battery_temp": 1, - "normal_battery_voltage": 6.9, - "normal_charge_current": 0.5, - "normal_micro_temp": 20, - "normal_temp": 20, - "radio": { - "fsk": { - "broadcast_address": 255, - "modulation_type": 0, - "node_address": 1 - }, - "license": "", - "lora": { - "ack_delay": 0.2, - "coding_rate": 8, - "cyclic_redundancy_check": true, - "max_output": true, - "spreading_factor": 8, - "transmit_power": 23 - }, - "modulation": "LoRa", - "receiver_id": 250, - "sender_id": 251, - "start_time": 80000, - "transmit_frequency": 437.4 - }, - "reboot_time": 3600, - "repeat_code": "RP", - "sleep_duration": 30, - "super_secret_code": "ABCD", - "turbo_clock": false -} diff --git a/src/flight-software/boot.py b/src/flight-software/boot.py index e69de29..94a8f25 100644 --- a/src/flight-software/boot.py +++ b/src/flight-software/boot.py @@ -0,0 +1,44 @@ +# from busio import _spi_init +# import os +# from lib.pysquared.nvm.counter import Counter + +# import board +# import sdcardio +import storage + +# This tells the computer to disconnect from the board's mass storage device (CIRCUITPY drive). +# This is the correct method for CircuitPython 9+ +storage.disable_usb_drive() # disable CIRCUITPY +try: + with open("/sd/leaderboard.json", "r") as file: + pass # File exists +except OSError: + with open("/sd/leaderboard.json", "w") as file: + file.write("{}") + + +# After the USB drive is disabled, we can remount the /sd filesystem +# as writable for our CircuitPython code. +# storage.remount("/sd", readonly=False) + +# os.mkdir("/sd") # Ensure the lib directory exists' + +# from lib.pysquared.logger import Logger + +# logger: Logger = Logger( +# error_counter=Counter(0), +# colorized=False, +# ) + +# spi1 = _spi_init( +# logger, +# board.SPI1_SCK, +# board.SPI1_MOSI, +# board.SPI1_MISO, +# ) + +# sd = sdcardio.SDCard(spi1, board.SPI1_CS1) +# vfs = storage.VfsFat(sd) + +# storage.mount(vfs, "/sd") +# os.listdir("/sd") diff --git a/src/flight-software/main.py b/src/flight-software/main.py index 33ab8c1..642bbcf 100644 --- a/src/flight-software/main.py +++ b/src/flight-software/main.py @@ -1,4 +1,3 @@ -import gc import os import time @@ -21,10 +20,15 @@ from lib.pysquared.rtc.manager.microcontroller import MicrocontrollerManager from lib.pysquared.sleep_helper import SleepHelper from lib.pysquared.watchdog import Watchdog +from utils import listener_nominal_power_loop from version import __version__ boot_time: float = time.time() +# Satellite configuration constants +CUBE_IDS = ["Listener1", "Listener2", "Listener3"] +MY_CUBESAT_ID = "Listener1" + rtc = MicrocontrollerManager() (boot_count := Counter(index=Register.boot_count)).increment() @@ -128,25 +132,12 @@ boot_count, ) - def nominal_power_loop(): - logger.debug( - "FC Board Stats", - bytes_remaining=gc.mem_free(), - ) - - uhf_packet_manager.send(config.radio.license.encode("utf-8")) - - beacon.send() - - cdh.listen_for_commands(10) - - sleep_helper.safe_sleep(config.sleep_duration) - try: logger.info("Entering main loop") while True: # TODO(nateinaction): Modify behavior based on power state - nominal_power_loop() + listener_nominal_power_loop(logger, uhf_packet_manager, sleep_helper) + # nominal_power_loop(logger, uhf_packet_manager, sleep_helper, cube_ids=["Listener1", "Listener2", "Listener3"]) except Exception as e: logger.critical("Critical in Main Loop", e) diff --git a/src/flight-software/repl.py b/src/flight-software/repl.py index b98e67f..9af278c 100644 --- a/src/flight-software/repl.py +++ b/src/flight-software/repl.py @@ -3,12 +3,14 @@ import board import digitalio +import microcontroller +import storage from lib.adafruit_mcp230xx.mcp23017 import ( MCP23017, # This is Hacky V5a Devel Stuff### ) from lib.adafruit_tca9548a import TCA9548A # This is Hacky V5a Devel Stuff### -# from lib.pysquared.Big_Data import AllFaces ### This is Hacky V5a Devel Stuff### +# from lib.pysquared.Big_Data import AllFaces ### This is Hacky V5a Devel Stuff## from lib.pysquared.beacon import Beacon from lib.pysquared.cdh import CommandDataHandler from lib.pysquared.config.config import Config @@ -16,24 +18,49 @@ from lib.pysquared.hardware.busio import _spi_init, initialize_i2c_bus from lib.pysquared.hardware.digitalio import initialize_pin from lib.pysquared.hardware.imu.manager.lsm6dsox import LSM6DSOXManager -from lib.pysquared.hardware.light_sensor.manager.veml7700 import VEML7700Manager from lib.pysquared.hardware.magnetometer.manager.lis2mdl import LIS2MDLManager from lib.pysquared.hardware.power_monitor.manager.ina219 import INA219Manager from lib.pysquared.hardware.radio.manager.rfm9x import RFM9xManager from lib.pysquared.hardware.radio.manager.sx1280 import SX1280Manager from lib.pysquared.hardware.radio.packetizer.packet_manager import PacketManager -from lib.pysquared.hardware.temperature_sensor.manager.mcp9808 import MCP9808Manager from lib.pysquared.logger import Logger from lib.pysquared.nvm.counter import Counter from lib.pysquared.protos.power_monitor import PowerMonitorProto from lib.pysquared.rtc.manager.microcontroller import MicrocontrollerManager from lib.pysquared.sleep_helper import SleepHelper from lib.pysquared.watchdog import Watchdog + +# Local imports from version import __version__ + +def erase_system(): + """Erase the filesystem to allow new code to be written to the board.""" + storage.erase_filesystem() + + +def hard_reboot(): + """Perform a hard reboot of the microcontroller.""" + microcontroller.reset() + + +def get_temp(sensor): + """ + Get temperature readings from a sensor for testing purposes. + + Args: + sensor: Temperature sensor object + """ + for i in range(1000): + print(sensor.get_temperature().value) + time.sleep(0.1) + + +# Initialize RTC rtc = MicrocontrollerManager() -logger: Logger = Logger( +# Initialize logger +logger = Logger( error_counter=Counter(0), colorized=False, ) @@ -44,13 +71,7 @@ software_version=__version__, ) - -def get_temp(sensor): - for i in range(1000): - print(sensor.get_temperature().value) - time.sleep(0.1) - - +# Initialize watchdog watchdog = Watchdog(logger, board.WDT_WDI) watchdog.pet() @@ -100,6 +121,7 @@ def get_temp(sensor): 100000, ) + sleep_helper = SleepHelper(logger, config, watchdog) uhf_radio = RFM9xManager( @@ -200,21 +222,21 @@ def all_faces_on(): tca = TCA9548A(i2c1, address=int(0x77)) -light_sensor0 = VEML7700Manager(logger, tca[0]) -light_sensor1 = VEML7700Manager(logger, tca[1]) -light_sensor2 = VEML7700Manager(logger, tca[2]) -light_sensor3 = VEML7700Manager(logger, tca[3]) -light_sensor4 = VEML7700Manager(logger, tca[4]) +# light_sensor0 = VEML7700Manager(logger, tca[0]) +# light_sensor1 = VEML7700Manager(logger, tca[1]) +# light_sensor2 = VEML7700Manager(logger, tca[2]) +# light_sensor3 = VEML7700Manager(logger, tca[3]) +# light_sensor4 = VEML7700Manager(logger, tca[4]) -## Onboard Temp Sensor ## -temp_sensor5 = MCP9808Manager(logger, i2c0, addr=25) # Antenna Board -temp_sensor6 = MCP9808Manager(logger, i2c1, addr=27) # Flight Controller Board -temp_sensor0 = MCP9808Manager(logger, tca[0], addr=27) -temp_sensor1 = MCP9808Manager(logger, tca[1], addr=27) -temp_sensor2 = MCP9808Manager(logger, tca[2], addr=27) -temp_sensor3 = MCP9808Manager(logger, tca[3], addr=27) -temp_sensor4 = MCP9808Manager(logger, tca[4], addr=27) +# ## Onboard Temp Sensor ## +# temp_sensor5 = MCP9808Manager(logger, i2c0, addr=25) # Antenna Board +# temp_sensor6 = MCP9808Manager(logger, i2c1, addr=27) # Flight Controller Board +# temp_sensor0 = MCP9808Manager(logger, tca[0], addr=27) +# temp_sensor1 = MCP9808Manager(logger, tca[1], addr=27) +# temp_sensor2 = MCP9808Manager(logger, tca[2], addr=27) +# temp_sensor3 = MCP9808Manager(logger, tca[3], addr=27) +# temp_sensor4 = MCP9808Manager(logger, tca[4], addr=27) battery_power_monitor: PowerMonitorProto = INA219Manager(logger, i2c1, 0x40) diff --git a/src/flight-software/utils.py b/src/flight-software/utils.py new file mode 100644 index 0000000..04d6f58 --- /dev/null +++ b/src/flight-software/utils.py @@ -0,0 +1,263 @@ +import json +import os +import time + +# Constants for communication commands +SEND_LEADERBOARD = "send_leaderboard" +SEND_PING = "ping" +CUBESAT_ID = "Listener1" # Updated to match main.py configuration +UPDATE_LEADERBOARD = "update_leaderboard" + +# Default cube IDs for communication +DEFAULT_CUBE_IDS = ["Listener1", "Listener2", "Listener3"] + + +def nominal_power_loop( + logger, uhf_packet_manager, sleep_helper, cube_ids=DEFAULT_CUBE_IDS +): + """ + Execute the nominal power loop for pinging multiple cubesats. + + Args: + logger: Logger instance for logging messages + uhf_packet_manager: Packet manager for UHF communication + sleep_helper: Helper for safe sleep operations + cube_ids: List of cube IDs to ping + """ + message = {} + + for cube_id in cube_ids: + # Send a ping to a single cubesat + logger.info(f"Pinging {cube_id}...") + message["current_time"] = time.monotonic() + message["cube_id"] = cube_id + message["command"] = SEND_PING + encoded_message = json.dumps(message, separators=(",", ":")).encode("utf-8") + + if not uhf_packet_manager.send(encoded_message): + logger.warning(f"Failed to send ping to {cube_id}") + sleep_helper.safe_sleep(1) # Wait before trying the next one + continue + + # Listen for an immediate pong response + logger.info(f"Listening for pong from {cube_id} for 10 seconds.") + received_message = uhf_packet_manager.listen(10) + + if received_message: + try: + decoded_message = json.loads(received_message.decode("utf-8")) + sender_id = decoded_message.get("cube_id") + command = decoded_message.get("command") + + if command == "pong" and sender_id == cube_id: + logger.info(f"Success! Received pong from {sender_id}.") + else: + logger.warning(f"Received unexpected message: {decoded_message}") + except (json.JSONDecodeError, UnicodeError) as e: + logger.error("Could not process received message", e) + else: + logger.warning(f"No response from {cube_id} within the time limit.") + + # Wait before contacting the next satellite + logger.debug("Waiting 5 seconds before contacting next satellite.") + sleep_helper.safe_sleep(5) + + +def send_leaderboard_power_loop( + logger, uhf_packet_manager, sleep_helper, cube_ids=DEFAULT_CUBE_IDS +): + """ + Execute the leaderboard sending power loop. + + Args: + logger: Logger instance for logging messages + uhf_packet_manager: Packet manager for UHF communication + sleep_helper: Helper for safe sleep operations + cube_ids: List of cube IDs to contact + """ + message = {} + + for cube_id in cube_ids: + # Send a leaderboard request to a single cubesat + logger.info(f"Requesting leaderboard from {cube_id}...") + message["current_time"] = time.monotonic() + message["cube_id"] = cube_id + message["command"] = SEND_LEADERBOARD + encoded_message = json.dumps(message, separators=(",", ":")).encode("utf-8") + + if not uhf_packet_manager.send(encoded_message): + logger.warning(f"Failed to send leaderboard request to {cube_id}") + sleep_helper.safe_sleep(1) + continue + + # Listen for response + logger.info(f"Listening for response from {cube_id} for 10 seconds.") + received_message = uhf_packet_manager.listen(10) + + if received_message: + try: + decoded_message = json.loads(received_message.decode("utf-8")) + # sender_id = decoded_message.get("cube_id") + print(decoded_message) + except (json.JSONDecodeError, UnicodeError) as e: + logger.error("Could not process received message", e) + else: + logger.warning(f"No response from {cube_id} within the time limit.") + + # Wait before contacting the next satellite + logger.debug("Waiting 5 seconds before contacting next satellite.") + sleep_helper.safe_sleep(5) + + +def listener_nominal_power_loop( + logger, uhf_packet_manager, sleep_helper, my_cubesat_id=CUBESAT_ID +): + """ + Execute the listener nominal power loop for responding to pings. + + Args: + logger: Logger instance for logging messages + uhf_packet_manager: Packet manager for UHF communication + sleep_helper: Helper for safe sleep operations + my_cubesat_id: ID of this cubesat + """ + received_message = uhf_packet_manager.listen(5) + + if received_message: + try: + decoded_message = json.loads(received_message.decode("utf-8")) + logger.info(f"Received message: {decoded_message}") + + cubesat_id = decoded_message.get("cube_id") + if cubesat_id == my_cubesat_id: + command = decoded_message.get("command") + + if command == SEND_PING: + logger.info(f"Received ping from {cubesat_id}") + response_message = { + "current_time": time.monotonic(), + "cube_id": my_cubesat_id, + "command": "pong", + } + encoded_response = json.dumps( + response_message, separators=(",", ":") + ).encode("utf-8") + sleep_helper.safe_sleep(1) + uhf_packet_manager.send(encoded_response) + + elif command == SEND_LEADERBOARD: + send_leaderboard(sleep_helper, uhf_packet_manager, my_cubesat_id) + elif command == UPDATE_LEADERBOARD: + update_leaderboard(decoded_message.get("name")) + except (ValueError, UnicodeError) as e: + logger.error("Failed to decode message", e) + + +# TO DO: turn this into a real test for our flash memory +def test_sd(): + """Test SD card functionality and file operations.""" + print(f"Current directory contents: {os.listdir()}") + print("___________") + + try: + os.mkdir("sd/new_folder") + print(f"SD directory contents: {os.listdir('sd')}") + except OSError as e: + print(f"Error creating directory: {e}") + + print("_______________") + + try: + with open("boot_out.txt", "r") as file: + print(file.read()) + except FileNotFoundError: + print("boot_out.txt not found") + except Exception as e: + print(f"Error reading boot_out.txt: {e}") + + +def update_leaderboard(name): + """ + Update the leaderboard with a new entry or increment existing entry. + + Args: + name: Name to add or increment in the leaderboard + """ + leaderboard_file = "sd/leaderboard.json" + + try: + # Load existing leaderboard + with open(leaderboard_file, "r") as file: + current_leaderboard = json.load(file) + except FileNotFoundError: + # Create new leaderboard if file doesn't exist or is invalid + current_leaderboard = {} + + # Update the leaderboard + if name not in current_leaderboard: + current_leaderboard[name] = 1 + else: + current_leaderboard[name] += 1 + + # Save updated leaderboard + try: + with open(leaderboard_file, "w") as file: + json.dump(current_leaderboard, file) + except Exception as e: + print(f"Error saving leaderboard: {e}") + + +def display_leaderboard(): + print("LEADERBOARD:") + + try: + with open("sd/leaderboard.json", "r") as file: + current_leaderboard = json.load(file) + except (FileNotFoundError, json.JSONDecodeError): + print("No leaderboard data available") + return + + sorted_leaderboard = sorted( + current_leaderboard.items(), key=lambda kv: (-kv[1], kv[0]) + ) + + for i, (name, score) in enumerate(sorted_leaderboard, 1): + print(f"{i}. {name}: {score}") + + +def send_leaderboard(sleep_helper, uhf_packet_manager, my_cubesat_id=CUBESAT_ID): + """ + Send the current leaderboard data via radio. + + Args: + sleep_helper: Helper for safe sleep operations + uhf_packet_manager: Packet manager for UHF communication + my_cubesat_id: ID of this cubesat + """ + try: + with open("sd/leaderboard.json", "r") as file: + current_leaderboard = json.load(file) + except (FileNotFoundError, json.JSONDecodeError): + current_leaderboard = {} + + response_message = { + "current_time": time.monotonic(), + "cube_id": my_cubesat_id, + "command": SEND_LEADERBOARD, + "leaderboard": current_leaderboard, + } + + encoded_response = json.dumps(response_message, separators=(",", ":")).encode( + "utf-8" + ) + + print(f"Sending leaderboard: {encoded_response}") + sleep_helper.safe_sleep(1) + uhf_packet_manager.send(encoded_response) + + # Clear the leaderboard after sending + try: + with open("sd/leaderboard.json", "w") as file: + file.write("{}") + except Exception as e: + print(f"Error clearing leaderboard: {e}")