Creating a Repeater with cuSignal and the AIR-T

The goal with this tutorial is to combine the previous tutorials into a real-world example that will continuously receive signals from the AIR-T, perform detection using the GPU, and repeat any signal that passses the detector's threshold. The steps in the process are:

  • Step 1: Create a GPU power detector using cuSignal
  • Step 2: Start a transmit task that sends any signal data array that is passed in to the AIR-T's RF transmitter
  • Step 3: Start the transceiver and continuously receive signal data
  • Step 4: Repeat detected signals by sending them to the transmit task


NOTE: IF YOU CHOOSE TO USE YOUR PRODUCT TO TRANSMIT USING AN ANTENNA, IT IS YOUR RESPONSIBILITY TO MAKE SURE THAT YOU ARE IN COMPLIANCE WITH ALL LAWS FOR THE COUNTRY, FREQUENCY, AND POWER LEVELS IN WHICH THE DEVICE IS USED. IT IS THE RESPONSIBILITY OF THE USER TO MAINTAIN COMPLIANCE WITH ALL LOCAL LAWS AND REGULATIONS. DEEPWAVE DIGITAL IS NOT RESPONSIBLE FOR ANY LAWSUITS, FINES, OR DAMAGES THAT VIOLATE THESE LAWS OR POLICIES.

Compatibility

This tutorial code will run on AIR-T hardware running AirStack 0.4.0 or later. The code may execute on AirStack 0.3.0 with limited performance. The code will not run on AirStack 0.2.x or earlier.

Prerequisites

In this tutorial, we will build upon a few of the earlier tutorials. If you have not already done so, we recommend going through the following tutorials for prerequisites:

Step 1: Create a GPU Based Power Detector

The first step of this tutorial is to create a PowerDetector class in Python that will:

  1. Calculate the instantaneous power envelope of a complex-valued input signal
  2. Apply a finite impulse response (FIR) low-pass filter to the signal's power envelope
  3. Calculate the average background power level and store that value in a FIFO buffer that will be used to continuously update a detection threshold
  4. Calculate the number of samples within the current array of signal data that are above the calculated threshold.
  5. If the calculated number of samples is above a configurable samp_above_thresh setting, declare that a signal has been detected.

Initializing the PowerDetector Class
The constructor for the PowerDetector class will configure the detector using the detection threshold (specified in decibels above the average background power level), the number of filter taps, the normalized filter cutoff frequency, and the number of samples above threshold required for there to be a detection. Additionally, the input buffer is provided here. This detector assumes that all data buffers will be provided are the same length and runs the detection process a single time during the constructor. This pre-compiles several GPU kernels and sets up the digital filtering for the provided buffer size and data type, which avoids the first call to detect() taking significantly more time to execute.

In order to efficiently perform the low-pass filtering, we will calculate the filter coefficients at initialization rather than each time the detect method is called. Additionally, we will preallocate all GPU arrays (using CuPy) and create a first-in-first-out (FIFO) CPU array that will be used to keep track of the average background level for the previous 100 windows. This array is used to compute a cheap moving average on the CPU. The result of this moving average is a dynamic threshold that will adjust based on the spectral environment.

The implementation of the constructor is shown below.

def __init__(self, buff, thresh_db, ntaps=65, cutoff=0.02, samp_above_thresh=20):
    self._thresh_offset = 10 ** (thresh_db / 10)  # Convert thresh to linear
    self._thresh = float('inf')
    self._samp_above_thresh = samp_above_thresh

    # Calculate filter coefficients
    filt_coef = fir_filter_design.firwin(ntaps, cutoff, window=('kaiser', 0.5))
    group_delay = int(ntaps / 2)
    self._buff_len = len(buff)
    # cusignal filter returns array w/ padding so define index for signal ROI
    self._filter_roi = group_delay + cp.arange(self._buff_len, dtype=int)

    # Preallocate cupy arrays
    self._win = cp.asarray(filt_coef, dtype=cp.float32)
    self._envelope = cp.zeros(self._buff_len, dtype=cp.float32)
    self._seg_det_index = cp.zeros(self._buff_len, dtype=bool)

    # Create fifo for continuous threshold calculation
    self._fifo_len = 100
    self._fifo_index = 0
    self._bkg_sum_arr = np.full(self._fifo_len, np.inf)

    # Run detector one time to compile the CUDA kernels
    self.detect(buff)


Performing the Detection
The first step in performing the power detection is to calculate the power envelope of the complex-valued incoming signal, i.e., the absolute value. Following this calculation, the class applies a low-pass filter to the envelope using the upfirdn cuSignal function. Note that while upfirdn may be used to resample the signal, we are using it here without changing the data rate which will allow us to easily determine the exact start and end sample for each detected signal.

The next steps of the detector include calculating the average background power level, updating the FIFO, and determining the current threshold value. Once the threshold for the current window has been determined, we compare the power of each sample in the window to the threshold and determine if the number of samples above threshold is high enough to be considered a signal. If the detector determines that a signal is present (i.e., the number of samples above the threshold is more than 20 in the example), then the samples below the threshold are given a value of 0 and the new signal is returned. We assign the samples below the threshold a value a zero so that we are not repeating receiver noise. If a signal is not detected, then a None value is returned and, as will be shown in a later section, nothing will be repeated.

def detect(self, x_in):
    # Compute the instantaneous power for the current buffer
    x_envelope = cp.abs(x_in)
    # Filter and decimate the envelope to a lower data rate
    self._envelope[:] = cusignal.upfirdn(self._win, x_envelope)[self._filter_roi]

    # Update threshold
    # Add summation of current envelope to the threshold fifo array
    self._bkg_sum_arr[self._fifo_index] = cp.sum(self._envelope)
    # Update fifo index for next detection window
    self._fifo_index = (self._fifo_index + 1) % self._fifo_len
    # Calculate average background power level for the previous buffers in fifo
    bkg_avg = np.sum(self._bkg_sum_arr) / (self._fifo_len * self._buff_len)
    # Calculate new threshold value
    self._thresh = bkg_avg * self._thresh_offset

    # Calc index vector where power is above the threshold
    envelope_det_idx = self._envelope > self._thresh
    n_detections = cp.sum(envelope_det_idx)
    # Make sure at least samp_above_thresh are higher than the threshold
    if n_detections > self._samp_above_thresh:
        # Copy to cupy array as workaround to issue cuSignal #178
        x_out = cp.array(x_in)
        x_out[~envelope_det_idx] = 0  # Zero out samples below threshold
    else:
        x_out = None
    return x_out

Step 2: Start a Transmit Task

The code in this part of the example provides the implementation for the background task that will be launched to send a signal array, tx_sig, to a transmit stream without blocking the receive processing. It will send the signal just once and print an error message to the screen if the transmit call fails.

The detailed procedure for creating a transmit task was previously shown in the Multithreading on the AIR-T tutorial. Please refer to that tutorial for the complete details to design and use your own transmit tasks.

def tx_task_fn(sdr, tx_stream, tx_sig, tx_buff_len):
    """ Transmit task that can be made a background process """
    rc = sdr.writeStream(tx_stream, [tx_sig], tx_buff_len)
    if rc.ret != tx_buff_len:
        raise IOError('Tx Error {}:{}'.format(rc.ret, SoapySDR.errToStr(rc.ret)))
    print('*', end='', flush=True)  # print an asterisk when a signal is repeated

Step 3: Start the Transceiver

The receiver will be started the same way as each of the previous tutorials. The transmitter task will be launched as it was in the Multithreading on the AIR-T tutorial. The only noticeable difference from the previous receiver tutorials is that the receiver memory buffer is created using the cusignal.get_shared_mem() function call instead of a numpy array. This method to create the memory buffer allows for the detector to leverage the zero-copy feature built in to the AIR-T's drivers. Once the code segment below has finished executing, the receiver will be filling the buffer with samples and the transmitter will be actively awaiting a signal array to transmit.

#  Initialize the AIR-T receiver, set sample rate, gain, and frequency
sdr = SoapySDR.Device()
sdr.setSampleRate(SOAPY_SDR_RX, pars.channel, pars.samp_rate)
if pars.rx_gain.lower() == 'agc':  # Turn on AGC
    sdr.setGainMode(SOAPY_SDR_RX, pars.channel, True)
else:  # set manual gain
    sdr.setGain(SOAPY_SDR_RX, pars.channel, float(pars.rx_gain))
sdr.setFrequency(SOAPY_SDR_RX, pars.channel, pars.freq)

#  Initialize the AIR-T transmitter, set sample rate, gain, and frequency
sdr.setSampleRate(SOAPY_SDR_TX, pars.channel, pars.samp_rate)
sdr.setGain(SOAPY_SDR_TX, pars.channel, float(pars.tx_gain))
sdr.setFrequency(SOAPY_SDR_TX, pars.channel, pars.freq)

# Create SDR shared memory buffer, detector
buff = cusignal.get_shared_mem(pars.buff_len, dtype=cp.complex64)
detr = PowerDetector(buff, pars.threshold)

# Turn on the transmitter
tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [pars.channel])
sdr.activateStream(tx_stream)
# Setup thread subclass to asynchronously execute transmit requests
tx_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)

# Turn on the receiver
rx_stream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, [pars.channel])
sdr.activateStream(rx_stream)

Step 4: Repeat Detected Signals

Now that all of the building blocks have been put in place and the transceiver streams are activated, we can place the reading, detection, and transmitting of samples in a continuous while loop. The while loop will read data, perform error checking, send the received signal buffer to the detector class, and repeat any detected signals. This while loop will continuously run until the user exits by pressing ctrl-c.

print('Looking for signals to repeat. Press ctrl-c to exit.')
while True:
    try:
        sr = sdr.readStream(rx_stream, [buff], pars.buff_len)  # Read data
        if sr.ret == SOAPY_SDR_OVERFLOW:  # Data was dropped
            print('O', end='', flush=True)
            continue
        detected_sig = detr.detect(buff)
        if detected_sig is not None:
            # AIR-T transmitter currently only accepts numpy arrays or lists
            tx_sig = cp.asnumpy(detected_sig)
            tx_executor.submit(tx_task_fn, sdr, tx_stream, tx_sig, pars.buff_len)
            detr.plot_envelope(buff)  # Plot the signal end envelope
    except KeyboardInterrupt:
        break

Conclusion

This tutorial provides a detailed example of how to leverage the AIR-T, AirStack, cuSignal, and the GPU to perform signal detection, filtering, and repeating. The complete implemetation of the code described in this tutorial is shown below and can be used on any AIR-T that is running AirStack 0.4.0 or later.



Complete Code Listing

This code example is Open Source, made available under the terms of the BSD 3-Clause License. It may be found in the AirStack Examples repository on GitHub and is reproduced below.

#!/usr/bin/env python3
#
# Copyright 2020, Deepwave Digital, Inc.
# SPDX-License-Identifier: BSD-3-Clause

import sys
import argparse
import concurrent.futures
import SoapySDR
from SoapySDR import SOAPY_SDR_TX, SOAPY_SDR_RX
from SoapySDR import SOAPY_SDR_CF32, SOAPY_SDR_OVERFLOW
import cusignal
from cusignal.filter_design import fir_filter_design
import cupy as cp
import numpy as np
from matplotlib import pyplot as plt


def parse_command_line_arguments():
    help_formatter = argparse.ArgumentDefaultsHelpFormatter
    parser = argparse.ArgumentParser(description='Signal detector and repeater',
                                     formatter_class=help_formatter)
    parser.add_argument('-s', type=float, required=False, dest='samp_rate',
                        default=7.8128e6, help='Receiver sample rate in SPS')
    parser.add_argument('-t', type=int, required=False, dest='threshold',
                        default=5, help='Detection threshold above noise floor.')
    parser.add_argument('-f', type=float, required=False, dest='freq',
                        default=315e6, help='Receiver tuning frequency in Hz')
    parser.add_argument('-c', type=int, required=False, dest='channel',
                        default=0, help='Receiver channel')
    parser.add_argument('-g', type=str, required=False, dest='rx_gain',
                        default='agc', help='Receive Gain value in dB')
    parser.add_argument('-G', type=float, required=False, dest='tx_gain',
                        default=0, help='Transmit Gain value in dB')
    parser.add_argument('-n', type=int, required=False, dest='buff_len',
                        default=32768, help='Data buffer size (complex samples)')
    return parser.parse_args(sys.argv[1:])


class PowerDetector:
    """ Real-time power detector class for finding signals with AIR-T"""
    def __init__(self, buff, thresh_db, ntaps=65, cutoff=0.02,
                 samp_above_thresh=20):
        self._thresh_offset = 10 ** (thresh_db / 10)  # Convert thresh to linear
        self._thresh = float('inf')
        self._samp_above_thresh = samp_above_thresh

        # Calculate filter coefficients
        filt_coef = fir_filter_design.firwin(ntaps, cutoff,
                                             window=('kaiser', 0.5))
        group_delay = int(ntaps / 2)
        self._buff_len = len(buff)
        # cusignal filter returns array w/ padding so define index for signal ROI
        self._filter_roi = group_delay + cp.arange(self._buff_len, dtype=int)

        # Preallocate cupy arrays
        self._win = cp.asarray(filt_coef, dtype=cp.float32)
        self._envelope = cp.zeros(self._buff_len, dtype=cp.float32)
        self._seg_det_index = cp.zeros(self._buff_len, dtype=bool)

        # Create fifo for continuous threshold calculation
        self._fifo_len = 100
        self._fifo_index = 0
        self._bkg_sum_arr = np.full(self._fifo_len, np.inf)

        # Run detector one time to compile the CUDA kernels
        self.detect(buff)

    def plot_envelope(self, x_in=None):
        """ Plot the envelope of the most recent signal. Not that this will cause
        samples to drop if called every loop"""
        plt.figure(1)
        if x_in is not None:
            plt.plot(x_in.real, 'k', label='Signal (Real)')
            plt.plot(x_in.imag, 'g', label='Signal (Imag)')
        plt.plot(cp.asnumpy(self._envelope), 'r', label='Envelope')
        plt.plot([0, self._buff_len-1], [self._thresh, self._thresh], 'k--',
                 label='Threshold')
        plt.xlim([0, self._buff_len-1])
        plt.ylabel('Amplitude (dB)')
        plt.title('Received Signal')
        plt.legend()
        plt.show()

    def detect(self, x_in):
        # Compute the instantaneous power for the current buffer
        x_envelope = cp.abs(x_in)
        # Filter and decimate the envelope to a lower data rate
        self._envelope[:] = cusignal.upfirdn(self._win,
                                             x_envelope)[self._filter_roi]
        # Update threshold
        # Add summation of current envelope to the threshold fifo array
        self._bkg_sum_arr[self._fifo_index] = cp.sum(self._envelope)
        # Update fifo index for next detection window
        self._fifo_index = (self._fifo_index + 1) % self._fifo_len
        # Calculate avg background power level for the previous buffers in fifo
        bkg_avg = np.sum(self._bkg_sum_arr) / (self._fifo_len * self._buff_len)
        # Calculate new threshold value
        self._thresh = bkg_avg * self._thresh_offset

        # Calc index vector where power is above the threshold
        envelope_det_idx = self._envelope > self._thresh
        n_detections = cp.sum(envelope_det_idx)
        # Make sure at least samp_above_thresh are higher than the threshold
        if n_detections > self._samp_above_thresh:
            # Copy to cupy array as workaround to issue cuSignal #178
            x_out = cp.array(x_in)
            x_out[~envelope_det_idx] = 0  # Zero out samples below threshold
        else:
            x_out = None
        return x_out


def tx_task_fn(sdr, tx_stream, tx_sig, tx_buff_len):
    """ Transmit task that can be made a background process """
    rc = sdr.writeStream(tx_stream, [tx_sig], tx_buff_len)
    if rc.ret != tx_buff_len:
        raise IOError('Tx Error {}:{}'.format(rc.ret, SoapySDR.errToStr(rc.ret)))
    print('*', end='', flush=True)  # print an asterisk when a signal is repeated


def main():
    pars = parse_command_line_arguments()

    #  Initialize the AIR-T receiver, set sample rate, gain, and frequency
    sdr = SoapySDR.Device()
    sdr.setSampleRate(SOAPY_SDR_RX, pars.channel, pars.samp_rate)
    if pars.rx_gain.lower() == 'agc':  # Turn on AGC
        sdr.setGainMode(SOAPY_SDR_RX, pars.channel, True)
    else:  # set manual gain
        sdr.setGain(SOAPY_SDR_RX, pars.channel, float(pars.rx_gain))
    sdr.setFrequency(SOAPY_SDR_RX, pars.channel, pars.freq)

    #  Initialize the AIR-T transmitter, set sample rate, gain, and frequency
    sdr.setSampleRate(SOAPY_SDR_TX, pars.channel, pars.samp_rate)
    sdr.setGain(SOAPY_SDR_TX, pars.channel, float(pars.tx_gain))
    sdr.setFrequency(SOAPY_SDR_TX, pars.channel, pars.freq)

    # Create SDR shared memory buffer, detector
    buff = cusignal.get_shared_mem(pars.buff_len, dtype=cp.complex64)
    detr = PowerDetector(buff, pars.threshold)

    # Turn on the transmitter
    tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [pars.channel])
    sdr.activateStream(tx_stream)
    # Setup thread subclass to asynchronously execute transmit requests
    tx_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)

    # Turn on the receiver
    rx_stream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, [pars.channel])
    sdr.activateStream(rx_stream)

    # Start processing Data
    print('Looking for signals to repeat. Press ctrl-c to exit.')
    while True:
        try:
            sr = sdr.readStream(rx_stream, [buff], pars.buff_len)  # Read data
            if sr.ret == SOAPY_SDR_OVERFLOW:  # Data was dropped
                print('O', end='', flush=True)
                continue
            detected_sig = detr.detect(buff)
            if detected_sig is not None:
                # AIR-T transmitter currently only accepts numpy arrays or lists
                tx_sig = cp.asnumpy(detected_sig)
                tx_executor.submit(tx_task_fn, sdr, tx_stream, tx_sig,
                                   pars.buff_len)
                detr.plot_envelope(buff)  # Plot the signal end envelope
        except KeyboardInterrupt:
            break
    sdr.closeStream(rx_stream)
    sdr.closeStream(tx_stream)


if __name__ == '__main__':
    main()

Last update: August 26, 2020