Source code for pitop.pulse.microphone

import logging
import signal
from binascii import hexlify, unhexlify
from os import close, path, remove, rename, stat
from struct import pack
from sys import exit
from tempfile import mkstemp
from threading import Thread
from time import sleep

import serial

from pitop.pulse import configuration

logger = logging.getLogger(__name__)

_bitrate = 8
_continue_writing = False
_recording_thread = False
_thread_running = False
_exiting = False
_temp_file_path = ""

#######################
# INTERNAL OPERATIONS #
#######################


def __signal_handler(signal, frame):
    """INTERNAL.

    Handles signals from the OS.
    """

    global _exiting

    if _exiting is False:
        _exiting = True

        if _thread_running is True:
            stop()

    logger.info("\nQuitting...")
    exit(0)


def __get_size(filename):
    """INTERNAL.

    Gets the size of a file.
    """

    file_stats = stat(filename)
    return file_stats.st_size


def __from_hex(value):
    """INTERNAL.

    Gets a bytearray from hex data.
    """

    return bytearray.fromhex(value)


def __space_separated_little_endian(integer_value, byte_len):
    """INTERNAL.

    Get an integer in format for WAV file header.
    """

    if byte_len <= 1:
        pack_type = "<B"
    elif byte_len <= 2:
        pack_type = "<H"
    elif byte_len <= 4:
        pack_type = "<I"
    elif byte_len <= 8:
        pack_type = "<Q"
    else:
        logger.info("Value cannot be represented in 8 bytes - exiting")
        exit()

    hex_string = pack(pack_type, integer_value)
    temp = hexlify(hex_string).decode()
    return " ".join([temp[i : i + 2] for i in range(0, len(temp), 2)])


def __init_header_information():
    """INTERNAL.

    Create a WAV file header.
    """

    RIFF = "52 49 46 46"
    WAVE = "57 41 56 45"
    fmt = "66 6d 74 20"
    DATA = "64 61 74 61"

    if configuration.microphone_sample_rate_is_22khz():
        capture_sample_rate = 22050
    else:
        capture_sample_rate = 16000

    # ChunkID
    header = __from_hex(RIFF)
    # ChunkSize - 4 bytes (to be changed depending on length of data...)
    header += __from_hex(__space_separated_little_endian(0, 4))
    # Format
    header += __from_hex(WAVE)
    # Subchunk1ID
    header += __from_hex(fmt)
    # Subchunk1Size (PCM = 16)
    header += __from_hex(__space_separated_little_endian(16, 4))
    # AudioFormat   (PCM = 1)
    header += __from_hex(__space_separated_little_endian(1, 2))
    header += __from_hex(__space_separated_little_endian(1, 2))  # NumChannels
    # SampleRate
    header += __from_hex(__space_separated_little_endian(capture_sample_rate, 4))
    # ByteRate (Same as SampleRate due to 1 channel, 1 byte per sample)
    header += __from_hex(__space_separated_little_endian(capture_sample_rate, 4))
    # BlockAlign - (no. of bytes per sample)
    header += __from_hex(__space_separated_little_endian(1, 2))
    # BitsPerSample
    header += __from_hex(__space_separated_little_endian(_bitrate, 2))
    # Subchunk2ID
    header += __from_hex(DATA)
    # Subchunk2Size - 4 bytes (to be changed depending on length of data...)
    header += __from_hex(__space_separated_little_endian(0, 4))

    return header


def __update_header_in_file(file, position, value):
    """INTERNAL.

    Update the WAV header
    """

    hex_value = __space_separated_little_endian(value, 4)
    data = unhexlify("".join(hex_value.split()))

    file.seek(position)
    file.write(data)


def __finalise_wav_file(file_path):
    """INTERNAL.

    Update the WAV file header with the size of the data.
    """

    size_of_data = __get_size(file_path) - 44

    if size_of_data <= 0:
        logger.info("Error: No data was recorded!")
        remove(file_path)
    else:
        with open(file_path, "rb+") as file:
            logger.debug("Updating header information...")

            __update_header_in_file(file, 4, size_of_data + 36)
            __update_header_in_file(file, 40, size_of_data)


def __thread_method():
    """INTERNAL.

    Thread method.
    """

    __record_audio()


def __record_audio():
    """INTERNAL.

    Open the serial port and capture audio data into a temp file.
    """

    global _temp_file_path

    temp_file_tuple = mkstemp()
    close(temp_file_tuple[0])
    _temp_file_path = temp_file_tuple[1]

    if path.exists("/dev/serial0"):
        logger.debug("Opening serial device...")

        serial_device = serial.Serial(
            port="/dev/serial0",
            timeout=1,
            baudrate=250000,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            bytesize=serial.EIGHTBITS,
        )
        serial_device_open = serial_device.isOpen()

        if serial_device_open is True:
            try:
                logger.debug("Start recording")

                with open(_temp_file_path, "wb") as file:
                    logger.debug("WRITING: initial header information")
                    file.write(__init_header_information())

                    if serial_device.inWaiting():
                        logger.debug("Flushing input and starting from scratch")
                        serial_device.flushInput()

                    logger.debug("WRITING: wave data")

                    while _continue_writing:
                        while not serial_device.inWaiting():
                            sleep(0.01)

                        audio_output = serial_device.read(serial_device.inWaiting())
                        bytes_to_write = bytearray()

                        for pcm_data_block in audio_output:
                            if _bitrate == 16:
                                pcm_data_int = 0
                                pcm_data_int = pcm_data_block
                                scaled_val = int((pcm_data_int * 32768) / 255)
                                bytes_to_write += __from_hex(
                                    __space_separated_little_endian(scaled_val, 2)
                                )

                            else:
                                pcm_data_int = pcm_data_block
                                bytes_to_write += __from_hex(
                                    __space_separated_little_endian(pcm_data_int, 1)
                                )

                        file.write(bytes_to_write)

                        sleep(0.1)

            finally:
                serial_device.close()

                __finalise_wav_file(_temp_file_path)

                logger.debug("Finished Recording.")

        else:
            logger.info("Error: Serial port failed to open")

    else:
        logger.info("Error: Could not find serial port - are you sure it's enabled?")


#######################
# EXTERNAL OPERATIONS #
#######################


[docs] def record(): """Start recording on the pi-topPULSE microphone.""" global _thread_running global _continue_writing global _recording_thread if not configuration.mcu_enabled(): logger.info("Error: pi-topPULSE is not initialised.") exit() if _thread_running is False: _thread_running = True _continue_writing = True _recording_thread = Thread(group=None, target=__thread_method, daemon=True) _recording_thread.start() else: logger.info("Microphone is already recording!")
[docs] def is_recording(): """Returns recording state of the pi-topPULSE microphone.""" return _thread_running
[docs] def stop(): """Stops recording audio.""" global _thread_running global _continue_writing _continue_writing = False _recording_thread.join() _thread_running = False
[docs] def save(file_path, overwrite=False): """Saves recorded audio to a file.""" global _temp_file_path if _thread_running is False: if _temp_file_path != "" and path.exists(_temp_file_path): if path.exists(file_path) is False or overwrite is True: if path.exists(file_path): remove(file_path) rename(_temp_file_path, file_path) _temp_file_path = "" else: logger.info("File already exists") else: logger.info("No recorded audio data found") else: logger.info("Microphone is still recording!")
[docs] def set_sample_rate_to_16khz(): """Set the appropriate I2C bits to enable 16,000Hz recording on the microphone.""" configuration.set_microphone_sample_rate_to_16khz()
[docs] def set_sample_rate_to_22khz(): """Set the appropriate I2C bits to enable 22,050Hz recording on the microphone.""" configuration.set_microphone_sample_rate_to_22khz()
[docs] def set_bit_rate_to_unsigned_8(): """Set bitrate to device default.""" global _bitrate _bitrate = 8
[docs] def set_bit_rate_to_signed_16(): """Set bitrate to double that of device default by scaling the signal.""" global _bitrate _bitrate = 16
####################### # INITIALISATION # ####################### _signal = signal.signal(signal.SIGINT, __signal_handler)