Multithreading on the AIR-T

In this tutorial we will demonstrate how to create a background thread for transmitting on the AIR-T using AirStack. We will modify the Transmitting on the AIR-T tutorial to make the transmitting a background task. To accomplish this, we will leverage Python's concurrent.futures library to launch a background transmit task and the threading library to communicate with the background process.

Creating a Tone Signal

We will be using the same code base shown in the Transmitting on the AIR-T tutorial.

Transmit Subprocess

First, we will define a function that will repeat the tone signal indefinitely. We will pass the function the AIR-T SDR object, the stream object, the buffer length, and an event to monitor that will tell the subprocess to stop transmitting.

def tx_task_fn(sdr, tx_stream, tx_buff, buff_len, stop_tx_event):
    """ Sends same buffer to transmitter indefinitely in while loop until
    stop_tx_event is set """
    while not stop_tx_event.is_set():
        rc = sdr.writeStream(tx_stream, [tx_buff], buff_len)
        if rc.ret != buff_len:
            raise IOError('Tx Error {}: {}'.format(rc.ret, errToStr(rc.ret)))

From the code you can see that this function stays in the while loop until the stop_tx_event flag is set to True. We are going to define the stop_tx_event variable using the threading Python package so that the subprocess will have visibility into this variable.

AIR-T Initialization

We will define the baseband frequency of the tone (bb_freq) and the AIR-T's LO frequency (lo_freq) so that the RF tone frequency is the sum of the two, i.e., freq = bb_freq + lo_freq.

# Generate tone buffer that can be repeated without phase discontunity
bb_freq = fs / 8  # baseband frequency of tone. 8 samples per period
tx_buff = make_tone(buff_len, bb_freq, fs)
lo_freq = freq - bb_freq  # Calc LO freq to put tone at tone_rf

The next thing we need to do is to setup the transmitter on the radio and activate the stream. We will do this the same way that we have been doing in every other tutorial.

# Setup Radio
sdr = SoapySDR.Device()  # Create AIR-T instance
sdr.setSampleRate(SOAPY_SDR_TX, chan, fs)  # Set sample rate
sdr.setFrequency(SOAPY_SDR_TX, chan, lo_freq)  # Tune the LO
sdr.setGain(SOAPY_SDR_TX, chan, gain)

tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CS16, [chan])
sdr.activateStream(tx_stream)  # this turns the radio on

At this point the transmitter stream is activated and the radio is waiting to be sent signal data.

Start Transmit Subprocess

Now that the transmitter is waiting for signal data to be sent, we will create a subprocess that will launch the tx_task_fn in the background to send data to the transmitter stream.

# Setup thread subclass to asynchronously execute transmit task
tx = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# Create event that we can call to stop the transmit task running in background
tx_stop_event = threading.Event()
# Launch transmit task function to begin transmitting
tx_task = tx.submit(tx_task_fn, sdr, tx_stream, tx_buff, buff_len, tx_stop_event)
# Add callback function that will deactivate the TX stream once transmit finishes
tx_task.add_done_callback(lambda task: sdr.deactivateStream(tx_stream))

Once the tx.submit() process is started, the AIR-T is executing the tx_task_fn in the background and therefore signal is being transmitted. This will happen indefinitely until the user kills the process by executing tx_stop_event.set(). To check if the process is done, the user can call the tx_task.done() command which will return True or False depending on if the process has completed .

Main Process

Ultimately, we have placed the transmit task in a background process so that the main loop can run independently of transmit process. Since this is the first introduction to multithreading on the AIR-T, we will a simple timer with a print statement as the process in the main loop.

# Transmit
nsec = 0
while not tx_task.done():
    try:
        # You can put a receive process or another application here while the
        # transmit task runs in the background. Instead, we will just sleep for
        # 5 seconds and display a timer.
        time.sleep(5)
        nsec += 5
        print('Now transmitting for {} seconds'.format(nsec), end="\r")
    except KeyboardInterrupt:
        tx_stop_event.set()  # Stop transmitting
        tx_task.result(timeout=1.0)  # wait up to 1 sec for transmitter to stop
        break

# Stop streaming
sdr.closeStream(tx_stream)

The while loop above will run continuously until either the tx_task has an error or if the user presses ctrl-c. When the user presses ctrl-c, the code stops the transmit subprocess by setting tx_stop_event and waiting up to one second for it to finish. We then close the stream.

Conclusion

This tutorial provides a basic introduction to creating background threads on the AIR-T with AirStack. The full code is shown in the next section and can be used on any AIR-T that is running AirStack 0.4.0+. Deepwave recommends using the concurrent.futures Python package to create a subprocess for critical transmit tasks. In future tutorials, we will build upon this methodology to send data to the transmitter subtask based on events that are seen in the receiver.


Complete Code

#!/usr/bin/env python3
""" Transmits a tone out of the AIR-T. The script will create a tone segment that
is infinity repeatable without a phase discontinuity and with 8 samples per
period. The TX LO of the AIR-T is set such that the baseband frequency of the
generated tone plus the LO frequency will transmit at the desired RF. """
import sys
import numpy as np
import argparse
import SoapySDR
from SoapySDR import SOAPY_SDR_TX, SOAPY_SDR_CS16, errToStr
import concurrent.futures
import threading
import time


def make_tone(n, fcen, fs, phi=0.285):
    """ Generates tone signal window with a frequency that is an integer
    multiple of the sample rate so it can be repeated without a phase
    discontinuity. """
    period = fs / fcen
    assert n % period == 0, 'Total samples not integer number of periods'
    a = 2**15
    # Make Complex Valued Tone Signal
    wt = np.array(2 * np.pi * fcen * np.arange(n) / fs)
    sig_cplx = np.exp(1j * (wt + phi))
    # Convert to interleaved int16 values
    sig_int16 = np.empty(2 * n, dtype=np.int16)
    sig_int16[0::2] = 32767 * sig_cplx.real
    sig_int16[1::2] = 32767 * sig_cplx.imag
    return sig_int16


def tx_task_fn(sdr, tx_stream, tx_buff, buff_len, stop_tx_event):
    """ Sends same buffer to transmitter indefinitely in while loop until
    stop_tx_event is set """
    while not stop_tx_event.is_set():
        rc = sdr.writeStream(tx_stream, [tx_buff], buff_len)
        if rc.ret != buff_len:
            raise IOError('Tx Error {}: {}'.format(rc.ret, errToStr(rc.ret)))


def transmit_tone(freq, chan=0, fs=31.25, gain=-20, buff_len=16384):
    """ Transmit a tone out of the AIR-T """
    # Generate tone buffer that can be repeated without phase discontunity
    bb_freq = fs / 8  # baseband frequency of tone
    tx_buff = make_tone(buff_len, bb_freq, fs)
    lo_freq = freq - bb_freq  # Calc LO freq to put tone at tone_rf

    # Setup Radio
    sdr = SoapySDR.Device()  # Create AIR-T instance
    sdr.setSampleRate(SOAPY_SDR_TX, chan, fs)  # Set sample rate
    sdr.setFrequency(SOAPY_SDR_TX, chan, lo_freq)  # Tune the LO
    sdr.setGain(SOAPY_SDR_TX, chan, gain)

    tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CS16, [chan])
    sdr.activateStream(tx_stream)  # this turns the radio on

    # Setup thread subclass to asynchronously execute transmit task
    tx = concurrent.futures.ThreadPoolExecutor(max_workers=1)
    # Create event that we can call to stop the transmit task running in background
    tx_stop_event = threading.Event()
    # Launch transmit task function to begin transmitting
    tx_task = tx.submit(tx_task_fn, sdr, tx_stream, tx_buff, buff_len, tx_stop_event)
    # Add callback function that will deactivate the TX stream once transmit finishes
    tx_task.add_done_callback(lambda task: sdr.deactivateStream(tx_stream))

    # Transmit
    nsec = 0
    while not tx_task.done():
        try:
            # You can put a receive process or another application here while the
            # transmit task runs in the background. Instead, we will just sleep for
            # 5 seconds and display a timer.
            time.sleep(5)
            nsec += 5
            print('Now transmitting for {} seconds'.format(nsec), end="\r")
        except KeyboardInterrupt:
            tx_stop_event.set()  # Stop transmitting
            tx_task.result(timeout=1.0)  # wait up to a second for transmitter to stop
            break

    # Stop streaming
    sdr.closeStream(tx_stream)


def parse_command_line_arguments():
    """ Create command line options for transmit function """
    help_formatter = argparse.ArgumentDefaultsHelpFormatter
    parser = argparse.ArgumentParser(description='Transmit a tone on the AIR-T',
                                     formatter_class=help_formatter)
    parser.add_argument('-f', type=float, required=False, dest='freq',
                        default=1000e6, help='TX Tone Frequency')
    parser.add_argument('-c', type=int, required=False, dest='chan',
                        default=0, help='TX Channel Number [0 or 1]')
    parser.add_argument('-s', type=float, required=False, dest='fs',
                        default=31.25e6, help='TX Sample Rate')
    parser.add_argument('-g', type=float, required=False, dest='gain',
                        default=0, help='TX gain')
    parser.add_argument('-n', type=int, required=False, dest='buff_len',
                        default=16384, help='TX Buffer Size')
    return parser.parse_args(sys.argv[1:])


if __name__ == '__main__':
    pars = parse_command_line_arguments()
    transmit_tone(pars.freq, pars.chan, pars.fs, pars.gain, pars.buff_len)

Last update: August 11, 2020