First, the code imports all the required libraries.
import time import random import board import usb_cdc import digitalio import adafruit_rfm9x
Next, the radio is set up, if you would like to use this with a board other than the RFM RP2040, change the pins that CS
and RESET
use.
# radio setup RADIO_FREQ_MHZ = 915.0 LED = digitalio.DigitalInOut(board.LED) LED.direction = digitalio.Direction.OUTPUT CS = digitalio.DigitalInOut(board.RFM_CS) RESET = digitalio.DigitalInOut(board.RFM_RST) rfm9x = adafruit_rfm9x.RFM9x(spi, CS, RESET, RADIO_FREQ_MHZ) rfm9x.tx_power = 23
The code now initializes a few important variables before entering the main loop.
# Wait to receive packets. Note that this library can't receive data at a fast # rate, in fact it can only receive and process one 252 byte packet at a time. # This means you should only use this for low bandwidth scenarios, like sending # and receiving a single message at a time. print("Waiting for packets...") MESSAGE = b"" ID = None
In the first part of the main loop, the code checks to see if it has received any messages over the serial port. If it has, it forwards the message to the RFM to be sent over radio.
while True: char = usb_cdc.console.read(usb_cdc.console.in_waiting) if char: MESSAGE += char # print(char.decode('utf-8'), end="") if char[-1:] == b"\r": MESSAGE = MESSAGE[:-1] ID = random.randint(0, 1000) rfm9x.send(bytes(f"{ID}|", "utf-8") + MESSAGE) print(f"{ID}|{MESSAGE.decode()}") timestamp = time.monotonic() sent = MESSAGE MESSAGE = b"" continue
The last bit of code in this file handles what happens when a message is received and sending a reply to a received message to confirm receipt. It unpacks the message and sends it over serial along with a bit of other information about the signal strength.
⠀⠀⠀packet = rfm9x.receive() if packet is None: # Packet has not been received LED.value = False else: # Received a packet! LED.value = True try: PACKET_TEXT = str(packet, "ascii") except UnicodeError: print("error") continue print(PACKET_TEXT) mess_id, text = PACKET_TEXT.split("|") if mess_id != "-1": rfm9x.send(bytes(f"-1|{mess_id}", "utf-8")) print(f"Received: {PACKET_TEXT}") else: print(f"Delivered: {text}") ID = None rssi = rfm9x.last_rssi print(f"RSSI: {rssi} dB")
This file is one of the two files that runs on your computer. It starts out by importing all the necessary libraries.
import time import pathlib import os import PySimpleGUI as sg import serial from cryptography.fernet import Fernet
Then, the phone number to send to is set and the secret key is either created or read.
# Replace NUMBER with your phone number including country code NUMBER = "REPLACE_ME" try: with open("secret.key", "rb") as key_file: key = key_file.read() except FileNotFoundError: with open("secret.key", "wb") as key_file: key_file.write(Fernet.generate_key()) with open("secret.key", "rb") as key_file: key = key_file.read()
Before it goes into the main function, there are a few last variables to intialize.
aes = Fernet(key) port = int(input("port: /dev/ttyACM")) s = int(input("signal?")) ser = serial.Serial(f"/dev/ttyACM{port}", 115200, timeout=0.050) last_time = time.monotonic()
Then, the main function starts, the code sets up the GUI before initializing some important variables before the main loop starts
def ChatBotWithHistory(): # pylint: disable=too-many-statements,too-many-branches,too-many-locals # ------- Make a new Window ------- # # give our form a spiffy set of colors sg.theme("DarkPurple4") layout = [ [sg.Text("Encrypted LoRa Messaging Client", size=(40, 1))], [sg.Output(size=(127, 30), key="history", font=("Helvetica 10"))], [sg.Button("CLEAR", button_color=(sg.YELLOWS[0], sg.BLUES[0]))], [ sg.ML(size=(85, 5), enter_submits=True, key="query", do_not_clear=False), sg.Button( "SEND", button_color=(sg.YELLOWS[0], sg.BLUES[0]), bind_return_key=True ), sg.Button("EXIT", button_color=(sg.YELLOWS[0], sg.GREENS[0])), ], ] window = sg.Window( "Chat window with history", layout, default_element_size=(30, 2), font=("Helvetica", " 13"), default_button_element_size=(8, 2), return_keyboard_events=True, ) # ---===--- Loop taking in user input and using it --- # command_history = [] history_offset = 0 last_id = None
In the main loop, the code first tries to read from the window. Then it tries reading from the serial port and forwards the message along to Signal.
⠀⠀⠀while True: event, value = window.read(timeout=0.05) while ser.in_waiting: data_in = ser.readline() mess = data_in.decode()[:-1] try: if mess.startswith("Received"): mess = mess.split(" ", 1)[1] mess_id, text = mess.split("|", 1) text = aes.decrypt(text.encode()).decode() print(f"> {text}") if s: os.system(f'signal-cli send -m "{text}" {NUMBER}') elif "|" in mess: mess_id, text = mess.split("|", 1) last_id = int(mess_id) if "Delivered" in mess: mess_id = int(mess.split(" ", 1)[1]) if mess_id == last_id: print("Delivered") if "RSSI" in mess: print(mess + "\n") except Exception as error: # pylint: disable=broad-except print(error)
This branch checks to see if a new message has been received to the Signal account and if it has been it displays it and sends it to the RP2040 via Serial.
⠀⠀⠀⠀⠀⠀if os.path.isfile("message"): print("entered") time.sleep(0.1) path = pathlib.Path("message") with path.open() as file: signal = file.readline().rstrip() print(f"< {signal}") ser.write(aes.encrypt(signal.encode())) ser.write(signal.encode()) ser.write("\r".encode()) command_history.append(signal) history_offset = len(command_history) - 1 # manually clear input because keyboard events blocks clear window["query"].update("") os.system("rm message")
This branch handles what to do when certain window inputs are received. If a message is to be sent directly from the window, it encrypts it and sends it, if the user has hit the 'Exit' button, it closes the window, and if the user navigates up or down the chat window, it moves the window accordingly.
⠀⠀⠀⠀⠀⠀⠀if event: if event == "SEND": query = value["query"].rstrip() print(f"< {query}") ser.write(aes.encrypt(query.encode())) ser.write(query.encode()) ser.write("\r".encode()) command_history.append(query) history_offset = len(command_history) - 1 # manually clear input because keyboard events blocks clear window["query"].update("") # EXECUTE YOUR COMMAND HERE elif event in (sg.WIN_CLOSED, "EXIT"): # quit if exit event or X break elif "Up" in event and len(command_history) != 0: command = command_history[history_offset] # decrement is not zero history_offset -= 1 * (history_offset > 0) window["query"].update(command) elif "Down" in event and len(command_history) != 0: # increment up to end of list history_offset += 1 * (history_offset < len(command_history) - 1) command = command_history[history_offset] window["query"].update(command) elif event == "CLEAR": window["history"].update("") elif "Escape" in event: window["query"].update("")
Finally, the above function is run.
ChatBotWithHistory()
This is a helper file specifically to check signal for new messages in a non-blocking way. It starts by importing the required libraries.
from subprocess import check_output import os
The code then enters the main loop, starting by checking if there are new messages from Signal. If there are, it formats them in a more desirable way and writes them to a file, named message.
while True: SIGNAL = check_output('signal-cli receive', shell=True).decode("utf-8") if "Body" in SIGNAL: signal_msg = SIGNAL.split("Body")[1].split("\n")[0][2:] print(signal_msg) os.system("touch message") with open("message", "w") as F: F.write(signal_msg)
Page last edited March 08, 2024
Text editor powered by tinymce.