import logging
import signal
from copy import deepcopy
from math import ceil, cos, radians, sin
from os import path
from sys import exit
from threading import Timer
from time import sleep
from serial import Serial, serialutil
from pitop.pulse import configuration
logger = logging.getLogger(__name__)
_initialised = False
_w = 7
_h = 7
_rotation = 0
_brightness = 1.0
_max_freq = 50 # Maximum update speed is 50 times per second
_update_rate = 0.1
_running = False
_show_enabled = True
_gamma_correction_arr = [
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
2,
2,
2,
2,
2,
2,
2,
2,
3,
3,
3,
3,
3,
3,
3,
4,
4,
4,
4,
4,
5,
5,
5,
5,
6,
6,
6,
6,
7,
7,
7,
7,
8,
8,
8,
9,
9,
9,
10,
10,
10,
11,
11,
11,
12,
12,
13,
13,
13,
14,
14,
15,
15,
16,
16,
17,
17,
18,
18,
19,
19,
20,
20,
21,
21,
22,
22,
23,
24,
24,
25,
25,
26,
27,
27,
28,
29,
29,
30,
31,
32,
32,
33,
34,
35,
35,
36,
37,
38,
39,
39,
40,
41,
42,
43,
44,
45,
46,
47,
48,
49,
50,
50,
51,
52,
54,
55,
56,
57,
58,
59,
60,
61,
62,
63,
64,
66,
67,
68,
69,
70,
72,
73,
74,
75,
77,
78,
79,
81,
82,
83,
85,
86,
87,
89,
90,
92,
93,
95,
96,
98,
99,
101,
102,
104,
105,
107,
109,
110,
112,
114,
115,
117,
119,
120,
122,
124,
126,
127,
129,
131,
133,
135,
137,
138,
140,
142,
144,
146,
148,
150,
152,
154,
156,
158,
160,
162,
164,
167,
169,
171,
173,
175,
177,
180,
182,
184,
186,
189,
191,
193,
196,
198,
200,
203,
205,
208,
210,
213,
215,
218,
220,
223,
225,
228,
231,
233,
236,
239,
241,
244,
247,
249,
252,
255,
]
_sync = bytearray(
[7, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127]
)
_empty = [0, 0, 0]
_empty_map = [
[_empty, _empty, _empty, _empty, _empty, _empty, _empty],
[_empty, _empty, _empty, _empty, _empty, _empty, _empty],
[_empty, _empty, _empty, _empty, _empty, _empty, _empty],
[_empty, _empty, _empty, _empty, _empty, _empty, _empty],
[_empty, _empty, _empty, _empty, _empty, _empty, _empty],
[_empty, _empty, _empty, _empty, _empty, _empty, _empty],
[_empty, _empty, _empty, _empty, _empty, _empty, _empty],
]
_pixel_map = deepcopy(_empty_map)
#######################
# INTERNAL OPERATIONS #
#######################
def __initialise():
"""INTERNAL.
Initialise the matrix.
"""
global _initialised
global _serial_device
global _pixel_map
if not _initialised:
if configuration.mcu_enabled():
if not path.exists("/dev/serial0"):
err_str = "Could not find serial port - are you sure it's enabled?"
raise serialutil.SerialException(err_str)
logger.debug("Opening serial port...")
_serial_device = Serial("/dev/serial0", baudrate=250000, timeout=2)
if _serial_device.isOpen():
logger.debug("OK.")
else:
logger.info("Error: Failed to open serial port!")
exit()
_initialised = True
else:
logger.error("Error: pi-topPULSE not initialised by pi-topd")
def __signal_handler(signal, frame):
"""INTERNAL.
Handles signals from the OS to exit.
"""
logger.info("\nQuitting...")
stop()
off()
exit(0)
def __get_avg_color():
"""INTERNAL.
Get the average color of the matrix.
"""
total_rgb = [0, 0, 0]
avg_rgb = [0, 0, 0]
for x in range(_w):
for y in range(_h):
for c in range(3):
total_rgb[c] = total_rgb[c] + _pixel_map[x][y][c]
for i, val in enumerate(total_rgb):
avg_rgb[i] = int(round(val / (_w * _h)))
return avg_rgb
def __write(data):
"""INTERNAL.
Write data to the matrix.
"""
logger.debug(
"{s0:<4}{s1:<4}{s2:<4}{s3:<4}{s4:<4}{s5:<4}{s6:<4}{s7:<4}{s8:<4}{s9:<4}{s10:<4}".format(
s0=data[0],
s1=data[1],
s2=data[2],
s3=data[3],
s4=data[4],
s5=data[5],
s6=data[6],
s7=data[7],
s8=data[8],
s9=data[9],
s10=data[10],
)
)
_serial_device.write(data)
sleep(0.002)
def __get_gamma_corrected_value(original_value):
"""INTERNAL.
Converts a brightness value from 0-255 to the value that produces an
approximately linear scaling to the human eye.
"""
return _gamma_correction_arr[original_value]
def __scale_pixel_to_brightness(original_value):
"""INTERNAL.
Multiplies intended brightness of a pixel by brightness scaling
factor to generate an adjusted value.
"""
unrounded_new_brightness = original_value * _brightness
rounded_new_brightness = round(unrounded_new_brightness)
int_new_brightness = int(rounded_new_brightness)
return int_new_brightness
def __get_rotated_pixel_map():
"""INTERNAL.
Get a rotated copy of the current in-memory pixel map.
"""
rotated_pixel_map = deepcopy(_pixel_map)
# Some fancy maths to rotate pixel map so that
# 0,0 (x,y) - with rotation 0 - is the bottom left LED
scaled_rotation = int(_rotation / 90)
adjusted_scaled_rotation = scaled_rotation + 1
modulo_adjusted_scaled_rotation = adjusted_scaled_rotation % 4
count = (6 - modulo_adjusted_scaled_rotation) % 4
for x in range(count):
rotated_pixel_map = list(zip(*rotated_pixel_map[::-1]))
return rotated_pixel_map
def __brightness_correct(original_value):
"""INTERNAL.
Correct a single color for brightness.
"""
brightness_scaled = __scale_pixel_to_brightness(original_value)
new_value = __get_gamma_corrected_value(brightness_scaled)
return new_value
def __adjust_r_g_b_for_brightness_correction(r, g, b):
"""INTERNAL.
Correct LED for brightness.
"""
r = __brightness_correct(r)
g = __brightness_correct(g)
b = __brightness_correct(b)
return r, g, b
def __sync_with_device():
"""INTERNAL.
Send the sync frame to tell the device that LED data is expected.
"""
__initialise()
logger.debug("Sync data:")
__write(_sync)
def __rgb_to_bytes_to_send(rgb):
"""INTERNAL.
Format the LED data in the device-specific layout.
"""
# Create three 5-bit color vals, splitting the green bits
# into two parts (hardware spec):
# |XX|G0|G1|R0|R1|R2|R3|R4|
# |G2|G3|G4|B0|B1|B2|B3|B4|
r = rgb[0]
g = rgb[1]
b = rgb[2]
byte0 = (r >> 3) & 0x1F
byte1 = (b >> 3) & 0x1F
grnb0 = (g >> 1) & 0x60
grnb1 = (g << 2) & 0xE0
byte0 = (byte0 | grnb0) & 0xFF
byte1 = (byte1 | grnb1) & 0xFF
return byte0, byte1
def __timer_method():
"""INTERNAL.
Run by the timer on each tick.
"""
global _running
global _update_rate
while _running:
show()
sleep(_update_rate)
def __flip(direction):
"""INTERNAL.
Flip the pixel map.
"""
global _pixel_map
flipped_pixel_map = deepcopy(_pixel_map)
for x in range(_w):
for y in range(_h):
if direction == "h":
flipped_pixel_map[x][y] = _pixel_map[(_w - 1) - x][y]
elif direction == "v":
flipped_pixel_map[x][y] = _pixel_map[x][(_h - 1) - y]
else:
err = "Flip direction must be [h]orizontal or [v]ertical only"
raise ValueError(err)
_pixel_map = flipped_pixel_map
def __set_show_state(enabled):
"""INTERNAL."""
global _show_enabled
_show_enabled = enabled
if not _show_enabled:
_temp_disable_t.start()
def __enable_show_state():
"""INTERNAL."""
__set_show_state(True)
def __disable_show_state():
"""INTERNAL."""
__set_show_state(True)
#######################
# EXTERNAL OPERATIONS #
#######################
[docs]
def set_debug_print_state(debug_enable):
"""Enable/disable debug prints."""
global _debug
_debug = debug_enable
[docs]
def brightness(new_brightness):
"""Set the display brightness between 0.0 and 1.0.
:param new_brightness: Brightness from 0.0 to 1.0 (default 1.0)
"""
global _brightness
if new_brightness > 1 or new_brightness < 0:
raise ValueError("Brightness level must be between 0 and 1")
_brightness = new_brightness
[docs]
def get_brightness():
"""Get the display brightness value.
Returns a float between 0.0 and 1.0.
"""
return _brightness
[docs]
def rotation(new_rotation=0):
"""Set the display rotation.
:param new_rotation: Specify the rotation in degrees: 0, 90, 180 or
270
"""
global _rotation
if new_rotation in [0, 90, 180, 270]:
_rotation = new_rotation
return True
else:
raise ValueError("Rotation: 0, 90, 180 or 270 degrees only")
[docs]
def flip_h():
"""Flips the grid horizontally."""
__flip("h")
[docs]
def flip_v():
"""Flips the grid vertically."""
__flip("v")
[docs]
def get_shape():
"""Returns the shape (width, height) of the display."""
return (_w, _h)
[docs]
def get_pixel(x, y):
"""Get the RGB value of a single pixel.
:param x: Horizontal position from 0 to 7
:param y: Veritcal position from 0 to 7
"""
global _pixel_map
return _pixel_map[y][x]
[docs]
def set_pixel(x, y, r, g, b):
"""Set a single pixel to RGB color.
:param x: Horizontal position from 0 to 7
:param y: Veritcal position from 0 to 7
:param r: Amount of red from 0 to 255
:param g: Amount of green from 0 to 255
:param b: Amount of blue from 0 to 255
"""
global _pixel_map
new_r, new_g, new_b = __adjust_r_g_b_for_brightness_correction(r, g, b)
_pixel_map[y][x] = [new_r, new_g, new_b]
[docs]
def set_all(r, g, b):
"""Set all pixels to a specific color."""
global _pixel_map
for x in range(_w):
for y in range(_h):
new_r, new_g, new_b = __adjust_r_g_b_for_brightness_correction(r, g, b)
_pixel_map[x][y][0] = new_r
_pixel_map[x][y][1] = new_g
_pixel_map[x][y][2] = new_b
[docs]
def show():
"""Update pi-topPULSE with the contents of the display buffer."""
global _pixel_map
global _rotation
global _show_enabled
wait_counter = 0
attempt_to_show_early = not _show_enabled
if attempt_to_show_early:
logger.info("Can't update pi-topPULSE LEDs more than 50/s. Waiting...")
pause_length = 0.001
# Scale wait time to _max_freq
wait_counter_length = ceil(float(1 / float(_max_freq * pause_length)))
while not _show_enabled:
if wait_counter >= wait_counter_length:
# Timer hasn't reset for some reason - force override
__enable_show_state()
break
else:
sleep(pause_length)
wait_counter = wait_counter + 1
if attempt_to_show_early:
logger.debug("pi-topPULSE LEDs re-enabled.")
__sync_with_device()
rotated_pixel_map = __get_rotated_pixel_map()
avg_rgb = __get_avg_color()
__initialise()
logger.debug("LED data:")
# For each col
for x in range(_w):
# Write col to LED matrix
# Start with col no., so LED matrix knows which one it belongs to
pixel_map_buffer = chr(x)
# Get col's frame buffer, iterating over each pixel
for y in range(_h + 1):
if y == _h:
# Ambient lighting bytes
byte0, byte1 = __rgb_to_bytes_to_send(avg_rgb)
else:
byte0, byte1 = __rgb_to_bytes_to_send(rotated_pixel_map[x][y])
pixel_map_buffer += chr(byte0)
pixel_map_buffer += chr(byte1)
# Write col to LED matrix
arr = bytearray(pixel_map_buffer, "Latin_1")
__write(arr)
# Prevent another write if it's too fast
__disable_show_state()
[docs]
def clear():
"""Clear the buffer."""
global _pixel_map
_pixel_map = deepcopy(_empty_map)
[docs]
def off():
"""Clear the buffer and immediately update pi-topPULSE."""
clear()
show()
[docs]
def run_tests():
"""Runs a series of tests to check the LED board is working as expected."""
off()
# ------------------------------
# Pixels
# ------------------------------
counter = 0
for r in range(4):
rotation(90 * r)
for x in range(_w):
for y in range(_h):
rad = radians((float(counter) / (4 * _w * _h)) * 360)
r = int((sin(rad) * 127) + 127)
g = int((cos(rad) * 127) + 127)
b = 255 - int((sin(rad) * 127) + 127)
set_pixel(x, y, r, g, b)
show()
sleep(0.05)
counter = counter + 1
off()
sleep(0.2)
# ------------------------------
# Rows and rotation
# ------------------------------
for r in range(4):
rotation(90 * r)
for c in range(3):
for x in range(_w):
for y in range(_h):
set_pixel(
x,
y,
255 if c == 0 else 0,
255 if c == 1 else 0,
255 if c == 2 else 0,
)
show()
sleep(0.05)
off()
sleep(0.2)
# ------------------------------
# Brightness
# ------------------------------
for b in range(100):
brightness(float(b) / 100)
set_all(255, 255, 255)
show()
sleep(0.01)
for b in range(100):
brightness(1 - (float(b) / 100))
set_all(255, 255, 255)
show()
sleep(0.01)
off()
brightness(1.0)
sleep(0.2)
# ------------------------------
# Flipping
# ------------------------------
for x in range(int(_w / 2)):
for y in range(int(_h / 2)):
set_pixel(x, y, 255, 255, 255)
set_pixel(int(_w / 4), int(_h / 4), 0, 255, 0)
show()
sleep(0.5)
for f in range(4):
for x in range(2):
if x == 0:
flip_h()
else:
flip_v()
show()
sleep(0.5)
off()
sleep(0.2)
# ------------------------------
# Conway - auto refresh
# ------------------------------
start(0.1)
life_map = [
[0, 0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 0],
[1, 1, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0],
]
for r in range(40):
temp_map = deepcopy(life_map)
for x in range(_w):
for y in range(_h):
current_cell = temp_map[x][y]
neighbours = 0
neighbours = neighbours + temp_map[(x - 1) % _w][(y - 1) % _h]
neighbours = neighbours + temp_map[(x - 1) % _w][(y - 0) % _h]
neighbours = neighbours + temp_map[(x - 1) % _w][(y + 1) % _h]
neighbours = neighbours + temp_map[(x - 0) % _w][(y - 1) % _h]
neighbours = neighbours + temp_map[(x - 0) % _w][(y + 1) % _h]
neighbours = neighbours + temp_map[(x + 1) % _w][(y - 1) % _h]
neighbours = neighbours + temp_map[(x + 1) % _w][(y - 0) % _h]
neighbours = neighbours + temp_map[(x + 1) % _w][(y + 1) % _h]
if current_cell == 1 and (neighbours < 2 or neighbours > 3):
life_map[x][y] = 0
if current_cell == 0 and neighbours == 3:
life_map[x][y] = 1
for x in range(_w):
for y in range(_h):
if life_map[x][y] == 1:
set_pixel(x, y, 255, 255, 0)
else:
set_pixel(x, y, 0, 128, 0)
sleep(0.1)
stop()
off()
[docs]
def start(new_update_rate=0.1):
"""Starts a timer to automatically refresh the LEDs."""
global _update_rate
global _running
global _auto_refresh_timer
if new_update_rate < (1 / _max_freq):
_update_rate = 1 / _max_freq
else:
_update_rate = new_update_rate
_running = True
_auto_refresh_timer.start()
[docs]
def stop():
"""Stops the timer that automatically refreshes the LEDs."""
global _running
global _auto_refresh_timer
_running = False
_auto_refresh_timer.cancel()
##################
# INITIALISATION #
##################
_signal = signal.signal(signal.SIGINT, __signal_handler)
_auto_refresh_timer = Timer(_update_rate, __timer_method)
_temp_disable_t = Timer(_max_freq, __enable_show_state)
clear()