The code for this project is split into 3 files code.py, irc_client.py, and curses_irc_client.py. Each is responsible for their own part of the work required to make the full usable IRC client application. This page will give a high level explanation of each. The code also contains comments with more details about specific sections and components if you want to go deeper.
code.py
This file is what get launched when CircuitPython runs the application. It is responsible for initializing the hardware that the app uses like the ESP32-C6 co-processor for WiFi access, and the DAC for playing audio. It also creates the displayio elements that make up the user interface.
The Adafruit_CircuitPython_ColorTerminal library is used for the main Terminal instance that the interface is drawn into. This library supports colored terminal text using ANSI color escape codes. The IRC app uses it to have a few different colors to show nicknames of users in the channel, which makes the conversation easier to follow at a glance. At the end of code.py it calls the run_irc_client() function to launch the curses application.
curses_irc_client.py
This file contains the curses application for the IRC client. This layer has a main loop section that gets input from the user and takes action as necessary. It also calls irc_client.update() to check for new data that is come in from the server, more on that in the next section. curses_irc_client.py reads input from the keyboard as you type and enters the message you type on the user input line. When you press enter it's handles the message you have entered passing it off to irc_client.send_message() if it is a normal message, or handling it has a command if it starts with a forward slash like /beep. During the main loop this will get a page of messages from the irc_client.message_buffer and display them on the screen.
# SPDX-FileCopyrightText: 2025 Tim Cocks for Adafruit Industries
# SPDX-License-Identifier: MIT
import time
import adafruit_dang as curses
from irc_client import IRCClient
ANSI_BLACK_ON_GREY = chr(27) + "[30;100m"
ANSI_RESET = chr(27) + "[0m"
class Window:
"""
Terminal Window class that supports basic scrolling.
"""
def __init__(self, n_rows, n_cols, row=0, col=0):
self.n_rows = n_rows
self.n_cols = n_cols
self.row = row
self.col = col
@property
def bottom(self):
return self.row + self.n_rows - 1
def up(self, cursor): # pylint: disable=invalid-name
if cursor.row == self.row - 1 and self.row > 0:
self.row -= 1
def down(self, buffer, cursor):
if cursor.row == self.bottom + 1 and self.bottom < len(buffer) - 1:
self.row += 1
def horizontal_scroll(self, cursor, left_margin=5, right_margin=2):
n_pages = cursor.col // (self.n_cols - right_margin)
self.col = max(n_pages * self.n_cols - right_margin - left_margin, 0)
def translate(self, cursor):
return cursor.row - self.row, cursor.col - self.col
def irc_client_main(
stdscr,
radio,
irc_config,
terminal_tilegrid=None,
audio_interface=None,
beep_wave=None,
):
# pylint: disable=too-many-locals, too-many-branches, too-many-statements
"""
Main curses IRC client application loop.
"""
irc_client = IRCClient(
radio, irc_config, audio_interface=audio_interface, beep_wave=beep_wave
)
irc_client.connect()
# irc_client.join()
window = Window(terminal_tilegrid.height, terminal_tilegrid.width)
stdscr.erase()
img = [None] * window.n_rows
status_bar = {
"user_message": None,
"user_message_shown_time": 0,
}
cur_row_index = 0
user_input = ""
def show_user_message(message):
"""
Show a status message to the user
"""
status_bar["user_message"] = message + (
" " * (window.n_cols - 1 - len(message))
)
status_bar["user_message_shown_time"] = time.monotonic()
def setline(row, line):
"""
Set a line of text in the terminal window.
"""
if img[row] == line:
return
img[row] = line
line += " " * (window.n_cols - len(line) - 1)
stdscr.addstr(row, 0, line)
def get_page(row_index):
"""
Get a page of messages from the message buffer.
"""
page_len = window.n_rows - 2
page_start = max((len(irc_client.message_buffer) + row_index) - page_len, 0)
page_end = page_start + page_len
page = irc_client.message_buffer[page_start:page_end]
return page
# pylint: disable=too-many-nested-blocks
try:
# main application loop
while True:
lastrow = 0
lines_added = irc_client.update()
cur_page = get_page(cur_row_index)
if lines_added > 0 and len(cur_page) < window.n_rows - 2:
cur_row_index = max(cur_row_index - lines_added, 0)
cur_page = get_page(cur_row_index)
for row, line in enumerate(cur_page):
lastrow = row
setline(row, line)
for row in range(lastrow + 1, window.n_rows - 2):
setline(row, "")
user_input_row = window.n_rows - 2
if user_input:
setline(user_input_row, user_input)
else:
setline(user_input_row, " " * (window.n_cols - 1))
user_message_row = terminal_tilegrid.height - 1
if status_bar["user_message"] is None:
message = f" {irc_config['username']} | {irc_config['server']} | {irc_config['channel']}" # pylint: disable=line-too-long
message += " " * (terminal_tilegrid.width - len(message) - 1)
line = f"{ANSI_BLACK_ON_GREY}{message}{ANSI_RESET}"
else:
line = f"{ANSI_BLACK_ON_GREY}{status_bar['user_message']}{ANSI_RESET}"
if status_bar["user_message_shown_time"] + 3.0 < time.monotonic():
status_bar["user_message"] = None
setline(user_message_row, line)
# read from the keyboard
k = stdscr.getkey()
if k is not None:
if len(k) == 1 and " " <= k <= "~":
user_input += k
elif k == "\n": # enter key pressed
if not user_input.startswith("/"):
print(f"sending: {user_input}")
irc_client.send_message(user_input)
user_input = ""
else: # slash commands
parts = user_input.split(" ", 1)
if parts[0] in {"/j", "/join"}:
if len(parts) >= 2 and parts[1] != "":
if parts[1] != irc_client.config["channel"]:
irc_client.join(parts[1])
user_input = ""
else:
show_user_message("Already in channel")
user_input = ""
else:
show_user_message(
"Invalid /join arg. Use: /join <channel>"
)
user_input = ""
elif parts[0] == "/msg":
to_user, message_to_send = parts[1].split(" ", 1)
irc_client.send_dm(to_user, message_to_send)
user_input = ""
elif parts[0] == "/beep":
to_user = parts[1]
message_to_send = "*Beep*\x07"
irc_client.send_dm(to_user, message_to_send)
user_input = ""
elif parts[0] == "/op":
user_to_op = parts[1]
irc_client.op(user_to_op)
user_input = ""
elif parts[0] == "/deop":
user_to_op = parts[1]
irc_client.deop(user_to_op)
user_input = ""
elif parts[0] == "/kick":
user_to_kick = parts[1]
irc_client.kick(user_to_kick)
user_input = ""
elif parts[0] == "/ban":
user_to_ban = parts[1]
irc_client.ban(user_to_ban)
user_input = ""
elif parts[0] == "/unban":
user_to_unban = parts[1]
irc_client.unban(user_to_unban)
user_input = ""
elif parts[0] == "/whois":
user_to_check = parts[1]
irc_client.whois(user_to_check)
user_input = ""
elif k in ("KEY_BACKSPACE", "\x7f", "\x08"):
user_input = user_input[:-1]
elif k == "KEY_UP":
page_len = window.n_rows - 2
if len(irc_client.message_buffer) > page_len:
page_start = (
len(irc_client.message_buffer) + cur_row_index
) - page_len
if page_start > 0:
cur_row_index -= 1
elif k == "KEY_DOWN":
if cur_row_index < 0:
cur_row_index += 1
elif k == "KEY_PGUP":
page_len = window.n_rows - 2
if len(irc_client.message_buffer) > page_len:
page_start = (
len(irc_client.message_buffer) + cur_row_index
) - page_len
if page_start > 0:
cur_row_index -= 6
elif k == "KEY_PGDN":
if cur_row_index <= 0:
cur_row_index = cur_row_index + 6
else:
print(f"unknown key: {k}")
except KeyboardInterrupt as exc:
irc_client.disconnect()
raise KeyboardInterrupt from exc
def run_irc_client(
radio, irc_config, terminal, terminal_tilegrid, audio_interface=None, beep_wave=None
):
"""
Entry point to run the curses IRC client application.
"""
return curses.custom_terminal_wrapper(
terminal,
irc_client_main,
radio,
irc_config,
terminal_tilegrid,
audio_interface,
beep_wave,
)
irc_client.py
This file contains class which manages the network connection with the IRC server. The higher level code in the curses application will call functions on this class instead of directly interacting with the server. This class could be re-purposed to make a different IRC client program with a different user interface. It provides the message_buffer property which will contain a list of lines to show on the display. The higher level code must call the update() function from the main loop in order to fetch new messages from the server.
Whenever data is received by the server, it goes into the process_message() function to decide how to handle it. Some messages are shown to the user, others are internal server information not shown visually. Others like PING require that the client take action in response, which is handled automatically.
# SPDX-FileCopyrightText: 2025 Tim Cocks for Adafruit Industries
# SPDX-License-Identifier: MIT
import time
import adafruit_connection_manager
ANSI_ESCAPE_CODES = [
chr(27) + "[30m",
chr(27) + "[31m",
chr(27) + "[32m",
chr(27) + "[33m",
chr(27) + "[34m",
chr(27) + "[35m",
chr(27) + "[36m",
]
ANSI_RESET = chr(27) + "[0m"
class IRCClient:
"""
Handles interaction with IRC Server and makes incoming messages available.
:param radio: The network radio to connect with.
:param dict irc_config: Dictionary containing IRC configration for
server, port, username and channel.
:param audio_interface: Optional interface to play audio from for beep messages
:param beep_wave: Optional wave file to use for beep messages
:param int max_line_length: Maximum characters per line to format messages into.
"""
def __init__(
self,
radio,
irc_config,
audio_interface=None,
beep_wave=None,
max_line_length=120,
):
self.radio = radio
self.config = irc_config
required = {"username", "server", "channel"}
for key in required:
if key not in self.config:
raise ValueError(
f"missing required config key. Required keys are: {required}"
)
if "port" not in self.config:
self.config["port"] = 6667
if "timeout" not in self.config:
self.config["timeout"] = 120
self.pool = adafruit_connection_manager.get_radio_socketpool(radio)
self.connection_manager = adafruit_connection_manager.get_connection_manager(
self.pool
)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(radio)
print(f"Connecting to {self.config['server']}:{self.config['port']}...")
self.socket = self.connection_manager.get_socket(
self.config["server"],
self.config["port"],
"",
timeout=0.01,
is_ssl=True,
ssl_context=ssl_context,
)
print("Connected")
# color to use for next unique username
self.next_color_index = 4
# map of unique usernames to color
self.user_color_map = {}
# buffer for incoming data until it's a full line
self.line_buffer = ""
# buffer for full incoming chat messages
self.message_buffer = []
# whether to show whois reply message on screen
self.show_whois_reply = False
self.audio_interface = audio_interface
if audio_interface is not None:
self.beep_wave = beep_wave
self.max_line_length = max_line_length
def connect(self):
"""
Connect to IRC Server
"""
# Send nick and user info
self.socket.send(f"NICK {self.config['username']}\r\n".encode("utf-8"))
self.socket.send(
f"USER {self.config['username']} 0 * :{self.config['username']}\r\n".encode(
"utf-8"
)
)
def disconnect(self):
"""
Disconnect from IRC Server
"""
self.socket.send("QUIT :Goodbye\r\n".encode("utf-8"))
self.socket.close()
def readlines(self):
"""
Read incoming data from the socket and return a list of lines read.
"""
lines = []
# Receive data
data = self.socket.recv(4096).decode("utf-8")
if not data:
raise RuntimeError("Connection closed by server")
self.line_buffer += data
# Process complete lines
while "\r\n" in self.line_buffer:
line, self.line_buffer = self.line_buffer.split("\r\n", 1)
if line:
lines.append(line)
return lines
def update(self):
"""
Check for udpates from the server. Main loop of the program should call this.
"""
updated_display_lines = 0
try:
lines = self.readlines()
for line in lines:
updated_display_lines += self.process_message(line)
except OSError as e:
# no data before timeout
# print(e)
if "ETIMEDOUT" not in str(e):
raise RuntimeError(e) from e
# raise RuntimeError("Connection timed out")
return updated_display_lines
def send_message(self, message):
"""
Send a message to the channel that the user is in.
"""
irc_command = f"PRIVMSG {self.config['channel']} :{message}\r\n"
self.socket.send(irc_command.encode("utf-8"))
self.process_message(
f":{self.config['username']}!~{self.config['username']}@localhost "
+ irc_command[:-2]
)
def send_dm(self, to_user, message):
"""
Send a direct message to a specified user.
"""
irc_command = f"PRIVMSG {to_user} :{message}\r\n"
self.socket.send(irc_command.encode("utf-8"))
color = self.get_color_for_user(to_user)
self.message_buffer.append(f"DM out: <{color}{to_user}{ANSI_RESET}> {message}")
def op(self, user):
"""
Make specified user an operator in the channel that the user is in.
You must already be an operator to grant operator privilege.
"""
op_cmd = f"MODE {self.config['channel']} +o {user}\r\n"
self.socket.send(op_cmd.encode("utf-8"))
def deop(self, user):
"""
Remove operator privilege from the specified user for this channel.
"""
deop_cmd = f"MODE {self.config['channel']} -o {user}\r\n"
self.socket.send(deop_cmd.encode("utf-8"))
def kick(self, user):
"""
Kick a specified user from the channel.
"""
kick_cmd = f"KICK {self.config['channel']} {user}\r\n"
self.socket.send(kick_cmd.encode("utf-8"))
def get_technical_name(self, nickname):
"""
Get the full technical name of a user given a nickname
"""
start_time = time.monotonic()
whois_cmd = f"WHOIS {nickname}\r\n"
self.socket.send(whois_cmd.encode("utf-8"))
whois_resp_lines = None
while whois_resp_lines is None and start_time + 3.0 > time.monotonic():
try:
whois_resp_lines = self.readlines()
except OSError as e:
if "ETIMEDOUT" in str(e):
whois_resp_lines = None
else:
raise RuntimeError(e) from e
if whois_resp_lines is None:
return None
for line in whois_resp_lines:
line = line.lstrip("\0")
parts = line.split(" ", 2)
if len(parts) >= 2:
command = parts[1]
if command != "311":
self.process_message(line)
continue
whois_response = parts[2].split(" ", 1)[1]
response_parts = whois_response.split(" ")
technical_name = f"*!{response_parts[1]}@{response_parts[2]}"
return technical_name
return None
def ban(self, user):
"""
Ban the specified user from the channel
"""
technical_name = self.get_technical_name(user)
if technical_name is not None:
ban_cmd = f"MODE {self.config['channel']} +b {technical_name}\r\n"
self.socket.send(ban_cmd.encode("utf-8"))
else:
self.message_buffer.append(
f"{ANSI_RESET} Error: failed whois lookup for ban"
)
def unban(self, user):
"""
Unban the specified user from the channel
"""
technical_name = self.get_technical_name(user)
if technical_name is not None:
ban_cmd = f"MODE {self.config['channel']} -b {technical_name}\r\n"
self.socket.send(ban_cmd.encode("utf-8"))
else:
self.message_buffer.append(
f"{ANSI_RESET} Error: failed whois lookup for unban"
)
def whois(self, user):
"""
Run a whois query on the specified user
"""
self.show_whois_reply = True
whois_cmd = f"WHOIS {user}\r\n"
self.socket.send(whois_cmd.encode("utf-8"))
def leave_channel(self):
"""
Leave the channel
"""
self.socket.send(f"PART {self.config['channel']}\r\n".encode("utf-8"))
def join(self, new_channel=None):
"""
Join the specified channel. This will leave the prior channel.
"""
if new_channel is not None and new_channel != self.config["channel"]:
self.leave_channel()
self.config["channel"] = new_channel
print(f"Joining channel {self.config['channel']}...")
self.socket.send(f"JOIN {self.config['channel']}\r\n".encode("utf-8"))
self.message_buffer.append(f"{ANSI_RESET}* Joined {self.config['channel']} *")
def get_color_for_user(self, username):
"""
Get the color to use for the specified username
"""
if username not in self.user_color_map:
self.user_color_map[username] = self.next_color_index
self.next_color_index += 1
if self.next_color_index > 6:
self.next_color_index = 1
return ANSI_ESCAPE_CODES[self.user_color_map[username]]
@staticmethod
def split_string_chunks(s, chunk_size):
"""
Split a string into chunks of specified size.
"""
chunks = []
for i in range(0, len(s), chunk_size):
chunks.append(s[i : i + chunk_size])
return chunks
def process_message(self, message):
# pylint: disable=too-many-branches, too-many-statements
"""
Process an incoming IRC message
:param message: The message that came from the IRC server.
:return lines_added: The number of lines added to the display
"""
# pylint: disable=too-many-locals
lines_added = 0
message = message.lstrip("\x00")
print(f"RAW: {message.encode('utf-8')}")
# Handle PING messages (keep connection alive)
if message.startswith("PING"):
pong_response = message.replace("PING", "PONG")
self.socket.send(f"{pong_response}\r\n".encode("utf-8"))
print("Responded to PING")
return 0
# Parse IRC message format: :prefix COMMAND params
parts = message.split(" ", 2)
# pylint: disable=too-many-nested-blocks
if len(parts) >= 2:
command = parts[1]
try:
command_num = int(command)
except ValueError:
command_num = None
# End of MOTD - now we can join the channel
if command in {"376", "422"}: # 422 is "no MOTD"
# join channel
self.join()
# Welcome messages (001-004 are standard welcome messages)
elif command in [
"001",
"002",
"003",
"004",
"251",
"252",
"253",
"254",
"255",
"265",
"266",
"375",
"372",
]:
if len(parts) >= 3:
welcome_text = parts[2]
if welcome_text.startswith(":"):
welcome_text = welcome_text[1:]
print(
f"'{welcome_text[0:11]}' startswith '{self.config['username']}' ? {welcome_text.startswith(self.config['username'])}" # pylint: disable=line-too-long
)
if welcome_text.startswith(self.config["username"]):
welcome_text = welcome_text.replace(
self.config["username"], "", 1
)
# terminal.write(f"WELCOME: {welcome_text}\n")
self.message_buffer.append(f"{welcome_text}")
lines_added += 1
print(f"WELCOME: {welcome_text}")
# Channel messages
elif command == "PRIVMSG":
if len(parts) >= 3:
# Extract sender nickname
sender = parts[0]
if sender.startswith(":"):
sender = sender[1:]
if "!" in sender:
sender = sender.split("!")[0]
# Extract message content
message_content = parts[2]
inc_channel, inc_message = message_content.split(" ", 1)
message_content = inc_message[1:]
if "*beep*" in message_content:
if (
self.audio_interface is not None
and not self.audio_interface.playing
):
print("playing beep")
self.audio_interface.play(self.beep_wave)
# print(f"is playing: {self.audio_interface.playing}")
while self.audio_interface.playing:
pass
print(f"message_content: {message_content.encode('utf-8')}")
color = self.get_color_for_user(sender)
if inc_channel == self.config["channel"]:
full_line = f"<{color}{sender}{ANSI_RESET}> {message_content}"
if len(full_line) < self.max_line_length:
self.message_buffer.append(full_line)
lines_added += 1
else:
chunks = self.split_string_chunks(
full_line, self.max_line_length
)
for chunk in chunks:
self.message_buffer.append(f"{ANSI_RESET}{chunk}")
lines_added += 1
elif inc_channel == self.config["username"]:
self.message_buffer.append(
f"DM in: <{color}{sender}{ANSI_RESET}> {message_content}"
)
lines_added += 1
print(f"<{sender}> {message_content}")
# Join confirmations
elif command == "JOIN":
sender = parts[0]
if sender.startswith(":"):
sender = sender[1:]
if "!" in sender:
sender = sender.split("!")[0]
if len(parts) >= 3:
joined_channel = parts[2]
if joined_channel.startswith(":"):
joined_channel = joined_channel[1:]
print(f"*** {sender} joined {joined_channel}")
# error messages
elif command_num is not None and 400 <= command_num <= 553:
# message codes: https://www.alien.net.au/irc/irc2numerics.html
self.message_buffer.append(f"{ANSI_RESET}{command} {parts[2]}")
lines_added += 1
# whois reply
elif self.show_whois_reply and command == "311":
whois_response = parts[2].split(" ", 1)[1]
self.message_buffer.append(f"{ANSI_RESET}{whois_response}")
lines_added += 1
self.show_whois_reply = False
# Mode messages
elif command == "MODE":
action_user = parts[0].split("!", 1)[0][1:]
mode_msg_parts = parts[2].split(" ", 2)
if len(mode_msg_parts) >= 3:
channel, mode, target_user = ( # pylint: disable=unused-variable
mode_msg_parts
)
action_user_color = self.get_color_for_user(action_user)
target_user_color = self.get_color_for_user(target_user)
self.message_buffer.append(
f"{action_user_color}{action_user}{ANSI_RESET} sets mode {mode} on {target_user_color}{target_user}{ANSI_RESET}" # pylint: disable=line-too-long
)
lines_added += 1
# Part messages
elif command == "PART":
sender = parts[0]
if sender.startswith(":"):
sender = sender[1:]
if "!" in sender:
sender = sender.split("!")[0]
if len(parts) >= 3:
left_channel = parts[2]
print(f"*** {sender} left {left_channel}")
# Quit messages
elif command == "QUIT":
sender = parts[0]
if sender.startswith(":"):
sender = sender[1:]
if "!" in sender:
sender = sender.split("!")[0]
quit_message = ""
if len(parts) >= 3:
quit_message = parts[2]
if quit_message.startswith(":"):
quit_message = quit_message[1:]
print(f"*** {sender} quit ({quit_message})")
return lines_added
Page last edited July 31, 2025
Text editor powered by tinymce.