Creating a Repeater with CuPy and the AIR-T¶
This tutorial code will run on AIR-T hardware running AirStack 2.0.0 or later. If you are running an older version of AirStack see the previous version of this tutorial here which uses the deprecated version of cuSignal.
The goal with this tutorial is to combine the previous tutorials into a real-world example that will continuously receive signals from the AIR-T, perform detection using the GPU, and repeat any signal that passes the detector's threshold. The steps in the process are:
- Step 1: Create a GPU power detector using CuPy
- Step 2: Start a transmit task that sends any signal data array that is passed in to the AIR-T's RF transmitter
- Step 3: Start the transceiver and continuously receive signal data
- Step 4: Repeat detected signals by sending them to the transmit task
IF YOU CHOOSE TO USE YOUR PRODUCT TO TRANSMIT USING AN ANTENNA, IT IS YOUR RESPONSIBILITY TO MAKE SURE THAT YOU ARE IN COMPLIANCE WITH ALL LAWS FOR THE COUNTRY, FREQUENCY, AND POWER LEVELS IN WHICH THE DEVICE IS USED. IT IS THE RESPONSIBILITY OF THE USER TO MAINTAIN COMPLIANCE WITH ALL LOCAL LAWS AND REGULATIONS. DEEPWAVE DIGITAL IS NOT RESPONSIBLE FOR ANY LAWSUITS, FINES, OR DAMAGES THAT VIOLATE THESE LAWS OR POLICIES.
Prerequisites¶
In this tutorial, we will build upon a few of the earlier tutorials. If you have not already done so, we recommend going through the following tutorials for prerequisites:
- Receiving Samples with Python
- Transmitting on the AIR-T
- Multithreading on the AIR-T
- Detecting and Labeling Training Data for Signal Classification
Step 1: Create a GPU Based Power Detector¶
The first step of this tutorial is to create a PowerDetector
class in Python that will:
- Calculate the instantaneous power envelope of a complex-valued input signal
- Apply a finite impulse response (FIR) low-pass filter to the signal's power envelope
- Calculate the average background power level and store that value in a FIFO buffer that will be used to continuously update a detection threshold
- Calculate the number of samples within the current array of signal data that are above the calculated threshold.
- If the calculated number of samples is above a configurable
samp_above_thresh
setting, declare that a signal has been detected.
Initializing the PowerDetector Class
The constructor for the PowerDetector
class will configure the detector using the detection threshold (specified in decibels above the average background power level), the number of filter taps, the normalized filter cutoff frequency, and the number of samples above threshold required for there to be a detection. Additionally, the input buffer is provided here. This detector assumes that all data buffers will be provided are the same length and runs the detection process a single time during the constructor. This pre-compiles several GPU kernels and sets up the digital filtering for the provided buffer size and data type, which avoids the first call to detect() taking significantly more time to execute.
In order to efficiently perform the low-pass filtering, we will calculate the filter coefficients at initialization rather than each time the detect
method is called. Additionally, we will preallocate all GPU arrays (using CuPy) and create a first-in-first-out (FIFO) CPU array that will be used to keep track of the average background level for the previous 100 windows. This array is used to compute a cheap moving average on the CPU. The result of this moving average is a dynamic threshold that will adjust based on the spectral environment.
The implementation of the constructor is shown below.
def __init__(self, buff, thresh_db, ntaps=65, cutoff=0.02, samp_above_thresh=20):
self._thresh_offset = 10 ** (thresh_db / 10) # Convert thresh to linear
self._thresh = float('inf')
self._samp_above_thresh = samp_above_thresh
# Calculate filter coefficients
filt_coef = signal.firwin(ntaps, cutoff, window=('kaiser', 0.5))
group_delay = int(ntaps / 2)
self._buff_len = len(buff)
# cusignal filter returns array w/ padding so define index for signal ROI
self._filter_roi = group_delay + cp.arange(self._buff_len, dtype=int)
# Preallocate cupy arrays
self._win = cp.asarray(filt_coef, dtype=cp.float32)
self._envelope = cp.zeros(self._buff_len, dtype=cp.float32)
self._seg_det_index = cp.zeros(self._buff_len, dtype=bool)
# Create fifo for continuous threshold calculation
self._fifo_len = 100
self._fifo_index = 0
self._bkg_sum_arr = np.full(self._fifo_len, np.inf)
# Run detector one time to compile the CUDA kernels
self.detect(buff)
Performing the Detection
The first step in performing the power detection is to calculate the power envelope of the complex-valued incoming signal, i.e., the absolute value. Following this calculation, the class applies a low-pass filter to the envelope using the upfirdn
CuPy function. Note that while upfirdn
may be used to resample the signal, we are using it here without changing the data rate which will allow us to easily determine the exact start and end sample for each detected signal.
The next steps of the detector include calculating the average background power level, updating the FIFO, and determining the current threshold value. Once the threshold for the current window has been determined, we compare the power of each sample in the window to the threshold and determine if the number of samples above threshold is high enough to be considered a signal. If the detector determines that a signal is present (i.e., the number of samples above the threshold is more than 20 in the example), then the samples below the threshold are given a value of 0
and the new signal is returned. We assign the samples below the threshold a value a zero so that we are not repeating receiver noise. If a signal is not detected, then a None
value is returned and, as will be shown in a later section, nothing will be repeated.
def detect(self, x_in):
# Compute the instantaneous power for the current buffer
x_envelope = cp.abs(x_in)
# Filter and decimate the envelope to a lower data rate
self._envelope[:] = signal.upfirdn(self._win, x_envelope)[self._filter_roi]
# Update threshold
# Add summation of current envelope to the threshold fifo array
self._bkg_sum_arr[self._fifo_index] = cp.sum(self._envelope)
# Update fifo index for next detection window
self._fifo_index = (self._fifo_index + 1) % self._fifo_len
# Calculate average background power level for the previous buffers in fifo
bkg_avg = np.sum(self._bkg_sum_arr) / (self._fifo_len * self._buff_len)
# Calculate new threshold value
self._thresh = bkg_avg * self._thresh_offset
# Calc index vector where power is above the threshold
envelope_det_idx = self._envelope > self._thresh
n_detections = cp.sum(envelope_det_idx)
# Make sure at least samp_above_thresh are higher than the threshold
if n_detections > self._samp_above_thresh:
x_in[~envelope_det_idx] = 0 # Zero out samples below threshold
else:
x_in = None
return x_in
Step 2: Start a Transmit Task¶
The code in this part of the example provides the implementation for the background task that will be launched to send a signal array, tx_sig
, to a transmit stream without blocking the receive processing. It will send the signal just once and print an error message to the screen if the transmit call fails.
The detailed procedure for creating a transmit task was previously shown in the Multithreading on the AIR-T tutorial. Please refer to that tutorial for the complete details to design and use your own transmit tasks.
def tx_task_fn(sdr, tx_stream, tx_sig, tx_buff_len):
""" Transmit task that can be made a background process """
rc = sdr.writeStream(tx_stream, [tx_sig], tx_buff_len)
if rc.ret != tx_buff_len:
raise IOError('Tx Error {}:{}'.format(rc.ret, SoapySDR.errToStr(rc.ret)))
print('*', end='', flush=True) # print an asterisk when a signal is repeated
Step 3: Start the Transceiver¶
The receiver will be started the same way as each of the previous tutorials. The transmitter task will be launched as it was in the Multithreading on the AIR-T tutorial. The only noticeable difference from the previous receiver tutorials is that the receiver memory buffer is created using the numba cuda.mapped_array()
function call instead of a numpy array. This method to create the memory buffer enables the detector to use the zero-copy architecture of the AIR-T. Once the code segment below has finished executing, the receiver will be filling the buffer with samples and the transmitter will be actively awaiting a signal array to transmit.
# Initialize the AIR-T receiver, set sample rate, gain, and frequency
sdr = SoapySDR.Device()
sdr.setSampleRate(SOAPY_SDR_RX, pars.channel, pars.samp_rate)
if pars.rx_gain.lower() == 'agc': # Turn on AGC
sdr.setGainMode(SOAPY_SDR_RX, pars.channel, True)
else: # set manual gain
sdr.setGain(SOAPY_SDR_RX, pars.channel, float(pars.rx_gain))
sdr.setFrequency(SOAPY_SDR_RX, pars.channel, pars.freq)
# Initialize the AIR-T transmitter, set sample rate, gain, and frequency
sdr.setSampleRate(SOAPY_SDR_TX, pars.channel, pars.samp_rate)
sdr.setGain(SOAPY_SDR_TX, pars.channel, float(pars.tx_gain))
sdr.setFrequency(SOAPY_SDR_TX, pars.channel, pars.freq)
# Create SDR shared memory buffer, detector
buff = cuda.mapped_array(pars.buff_len, dtype=cp.complex64)
detr = PowerDetector(buff, pars.threshold)
# Turn on the transmitter
tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [pars.channel])
sdr.activateStream(tx_stream)
# Setup thread subclass to asynchronously execute transmit requests
tx_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# Turn on the receiver
rx_stream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, [pars.channel])
sdr.activateStream(rx_stream)
Step 4: Repeat Detected Signals¶
Now that all of the building blocks have been put in place and the transceiver streams are activated, we can place the reading, detection, and transmitting of samples in a continuous while
loop. The while loop will read data, perform error checking, send the received signal buffer to the detector class, and repeat any detected signals. This while
loop will continuously run until the user exits by pressing ctrl-c.
print('Looking for signals to repeat. Press ctrl-c to exit.')
while True:
try:
sr = sdr.readStream(rx_stream, [buff], pars.buff_len) # Read data
if sr.ret == SOAPY_SDR_OVERFLOW: # Data was dropped
print('O', end='', flush=True)
continue
detected_sig = detr.detect(buff)
if detected_sig is not None:
# AIR-T transmitter currently only accepts numpy arrays or lists
tx_sig = cp.asnumpy(detected_sig)
tx_executor.submit(tx_task_fn, sdr, tx_stream, tx_sig, pars.buff_len)
detr.plot_envelope(buff) # Plot the signal end envelope
except KeyboardInterrupt:
break
Conclusion¶
This tutorial provides a detailed example of how to use the AIR-T, AirStack, CuPy, and the GPU to perform signal detection, filtering, and repeating. The complete implemetation of the code described in this tutorial is shown below and can be used on any AIR-T that is running AirStack 2.0.0 or later.
Complete Code Listing¶
This code example is Open Source, made available under the terms of the BSD 3-Clause License. It may be found in the AirStack Examples repository on GitHub and is reproduced below.
#!/usr/bin/env python3
#
# Copyright 2025, Deepwave Digital, Inc.
# SPDX-License-Identifier: BSD-3-Clause
import sys
import argparse
import concurrent.futures
import SoapySDR
from SoapySDR import SOAPY_SDR_TX, SOAPY_SDR_RX
from SoapySDR import SOAPY_SDR_CF32, SOAPY_SDR_OVERFLOW
import cupy as cp
import cupyx.scipy.signal as signal
from numba import cuda
import numpy as np
from matplotlib import pyplot as plt
def parse_command_line_arguments():
help_formatter = argparse.ArgumentDefaultsHelpFormatter
parser = argparse.ArgumentParser(description='Signal detector and repeater',
formatter_class=help_formatter)
parser.add_argument('-s', type=float, required=False, dest='samp_rate',
default=7.8128e6, help='Receiver sample rate in SPS')
parser.add_argument('-t', type=int, required=False, dest='threshold',
default=5, help='Detection threshold above noise floor.')
parser.add_argument('-f', type=float, required=False, dest='freq',
default=315e6, help='Receiver tuning frequency in Hz')
parser.add_argument('-c', type=int, required=False, dest='channel',
default=0, help='Receiver channel')
parser.add_argument('-g', type=str, required=False, dest='rx_gain',
default='agc', help='Receive Gain value in dB')
parser.add_argument('-G', type=float, required=False, dest='tx_gain',
default=0, help='Transmit Gain value in dB')
parser.add_argument('-n', type=int, required=False, dest='buff_len',
default=32768, help='Data buffer size (complex samples)')
return parser.parse_args(sys.argv[1:])
class PowerDetector:
""" Real-time power detector class for finding signals with AIR-T"""
def __init__(self, buff, thresh_db, ntaps=65, cutoff=0.02,
samp_above_thresh=20):
self._thresh_offset = 10 ** (thresh_db / 10) # Convert thresh to linear
self._thresh = float('inf')
self._samp_above_thresh = samp_above_thresh
# Calculate filter coefficients
filt_coef = signal.firwin(ntaps, cutoff,
window=('kaiser', 0.5))
group_delay = int(ntaps / 2)
self._buff_len = len(buff)
# cusignal filter returns array w/ padding so define index for signal ROI
self._filter_roi = group_delay + cp.arange(self._buff_len, dtype=int)
# Preallocate cupy arrays
self._win = cp.asarray(filt_coef, dtype=cp.float32)
self._envelope = cp.zeros(self._buff_len, dtype=cp.float32)
self._seg_det_index = cp.zeros(self._buff_len, dtype=bool)
# Create fifo for continuous threshold calculation
self._fifo_len = 100
self._fifo_index = 0
self._bkg_sum_arr = np.full(self._fifo_len, np.inf)
# Run detector one time to compile the CUDA kernels
self.detect(buff)
def plot_envelope(self, x_in=None):
""" Plot the envelope of the most recent signal. Not that this will cause
samples to drop if called every loop"""
plt.figure(1)
if x_in is not None:
plt.plot(x_in.real, 'k', label='Signal (Real)')
plt.plot(x_in.imag, 'g', label='Signal (Imag)')
plt.plot(cp.asnumpy(self._envelope), 'r', label='Envelope')
plt.plot([0, self._buff_len-1], [self._thresh, self._thresh], 'k--',
label='Threshold')
plt.xlim([0, self._buff_len-1])
plt.ylabel('Amplitude (dB)')
plt.title('Received Signal')
plt.legend()
plt.show()
def detect(self, x_in):
# Compute the instantaneous power for the current buffer
x_envelope = cp.abs(x_in)
# Filter and decimate the envelope to a lower data rate
self._envelope[:] = signal.upfirdn(self._win,
x_envelope)[self._filter_roi]
# Update threshold
# Add summation of current envelope to the threshold fifo array
self._bkg_sum_arr[self._fifo_index] = cp.sum(self._envelope)
# Update fifo index for next detection window
self._fifo_index = (self._fifo_index + 1) % self._fifo_len
# Calculate avg background power level for the previous buffers in fifo
bkg_avg = np.sum(self._bkg_sum_arr) / (self._fifo_len * self._buff_len)
# Calculate new threshold value
self._thresh = bkg_avg * self._thresh_offset
# Calc index vector where power is above the threshold
envelope_det_idx = self._envelope > self._thresh
n_detections = cp.sum(envelope_det_idx)
# Make sure at least samp_above_thresh are higher than the threshold
if n_detections > self._samp_above_thresh:
x_in[~envelope_det_idx] = 0 # Zero out samples below threshold
else:
x_in = None
return x_in
def tx_task_fn(sdr, tx_stream, tx_sig, tx_buff_len):
""" Transmit task that can be made a background process """
rc = sdr.writeStream(tx_stream, [tx_sig], tx_buff_len)
if rc.ret != tx_buff_len:
raise IOError('Tx Error {}:{}'.format(rc.ret, SoapySDR.errToStr(rc.ret)))
print('*', end='', flush=True) # print an asterisk when a signal is repeated
def main():
pars = parse_command_line_arguments()
# Initialize the AIR-T receiver, set sample rate, gain, and frequency
sdr = SoapySDR.Device()
sdr.setSampleRate(SOAPY_SDR_RX, pars.channel, pars.samp_rate)
if pars.rx_gain.lower() == 'agc': # Turn on AGC
sdr.setGainMode(SOAPY_SDR_RX, pars.channel, True)
else: # set manual gain
sdr.setGain(SOAPY_SDR_RX, pars.channel, float(pars.rx_gain))
sdr.setFrequency(SOAPY_SDR_RX, pars.channel, pars.freq)
# Initialize the AIR-T transmitter, set sample rate, gain, and frequency
sdr.setSampleRate(SOAPY_SDR_TX, pars.channel, pars.samp_rate)
sdr.setGain(SOAPY_SDR_TX, pars.channel, float(pars.tx_gain))
sdr.setFrequency(SOAPY_SDR_TX, pars.channel, pars.freq)
# Create SDR shared memory buffer, detector
buff = cuda.mapped_array(pars.buff_len, dtype=cp.complex64)
detr = PowerDetector(buff, pars.threshold)
# Turn on the transmitter
tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [pars.channel])
sdr.activateStream(tx_stream)
# Setup thread subclass to asynchronously execute transmit requests
tx_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# Turn on the receiver
rx_stream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, [pars.channel])
sdr.activateStream(rx_stream)
# Start processing Data
print('Looking for signals to repeat. Press ctrl-c to exit.')
while True:
try:
sr = sdr.readStream(rx_stream, [buff], pars.buff_len) # Read data
if sr.ret == SOAPY_SDR_OVERFLOW: # Data was dropped
print('O', end='', flush=True)
continue
detected_sig = detr.detect(buff)
if detected_sig is not None:
# AIR-T transmitter currently only accepts numpy arrays or lists
tx_sig = cp.asnumpy(detected_sig)
tx_executor.submit(tx_task_fn, sdr, tx_stream, tx_sig,
pars.buff_len)
detr.plot_envelope(buff) # Plot the signal end envelope
except KeyboardInterrupt:
break
sdr.closeStream(rx_stream)
sdr.closeStream(tx_stream)
if __name__ == '__main__':
main()