Receiving Samples with Python

In this tutorial we show you how to perform basic functionality with the AIR-T using the Python interface. You will learn how to interact with the radio drivers, stream signal data from the radio to the Tegra, and create a one-time plot of the wireless spectrum. We provide the source code, a step-by-step walk through, and a video tutorial. Enjoy!

Video Tutorial



Source Code

Single Channel Receive

# hello_world.py
# Import Packages
import numpy as np
from matplotlib import pyplot as plt
import SoapySDR
from SoapySDR import SOAPY_SDR_RX, SOAPY_SDR_CS16

#################################################################################
# Settings
#################################################################################
# Data transfer settings
rx_chan = 0             # RX1 = 0, RX2 = 1
N = 16384               # Number of complex samples per transfer
fs = 31.25e6            # Radio sample Rate
freq = 2.4e9            # LO tuning frequency in Hz
use_agc = True          # Use or don't use the AGC
timeout_us = int(5e6)
rx_bits = 16            # The AIR-T's ADC is 16 bits

#################################################################################
# Receive Signal
#################################################################################
#  Initialize the AIR-T receiver using SoapyAIRT
sdr = SoapySDR.Device(dict(driver="SoapyAIRT"))       # Create AIR-T instance
sdr.setSampleRate(SOAPY_SDR_RX, rx_chan, fs)          # Set sample rate
sdr.setGainMode(SOAPY_SDR_RX, rx_chan, use_agc)       # Set the gain mode
sdr.setFrequency(SOAPY_SDR_RX, rx_chan, freq)         # Tune the LO

# Create data buffer and start streaming samples to it
rx_buff = np.empty(2 * N, np.int16)  # Create memory buffer for data stream
rx_stream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CS16, [rx_chan])
sdr.activateStream(rx_stream)  # this turns the radio on

# Read the samples from the data buffer
sr = sdr.readStream(rx_stream, [rx_buff], N, timeoutUs=timeout_us)
rc = sr.ret # number of samples read or the error code
assert rc == N, 'Error Reading Samples from Device (error code = %d)!' % rc

# Stop streaming
sdr.deactivateStream(rx_stream)
sdr.closeStream(rx_stream)

#################################################################################
# Plot Signal
#################################################################################
# Convert interleaved shorts to numpy.complex64 normalized between [-1, 1]
s0 = rx_buff.astype(float) / np.power(2.0, rx_bits-1)
s = (s0[::2] + 1j*s0[1::2])

# Take the fourier transform of the signal and perform FFT Shift
S = np.fft.fftshift(np.fft.fft(s, N) / N)

# Time Domain Plot
plt.figure(num=1, figsize=(12.95, 7.8), dpi=150)
plt.subplot(211)
t_us = np.arange(N) / fs / 1e-6
plt.plot(t_us, s.real, 'k', label='I')
plt.plot(t_us, s.imag, 'r', label='Q')
plt.xlim(t_us[0], t_us[-1])
plt.xlabel('Time (us)')
plt.ylabel('Normalized Amplitude')

# Frequency Domain Plot
plt.subplot(212)
f_ghz = (freq + (np.arange(0, fs, fs/N) - (fs/2) + (fs/N))) / 1e9
plt.plot(f_ghz, 20*np.log10(np.abs(S)))
plt.xlim(f_ghz[0], f_ghz[-1])
plt.ylim(-100, 0)
plt.xlabel('Frequency (GHz)')
plt.ylabel('Amplitude (dBFS)')
plt.show()

Dual Channel Receive

When receiving on both channels with the AIR-T, users simply pass a list of channel numbers and a list of buffers to the streaming operators. If the user wishes to synchronize the two channels, a trigger is required. The trigger may be from an external source or may be internally triggered using the PCIe register interface. The method for internal triggering is illustrated in the source code below.

#!/usr/bin/env python3
#
# Copyright 2020, Deepwave Digital, Inc.
# SPDX-License-Identifier: BSD-3-Clause

import numpy as np
from matplotlib import pyplot as plt
import SoapySDR
from SoapySDR import SOAPY_SDR_RX, SOAPY_SDR_CS16, errToStr, SOAPY_SDR_WAIT_TRIGGER

################################################################################
# Settings
################################################################################
# Data transfer settings
rx_chan_list = [0, 1]   # RX1 = 0, RX2 = 1
N = 16384               # Number of complex samples per transfer
fs = 31.25e6            # Radio sample Rate
freq = 2.4e9            # LO tuning frequency in Hz
use_agc = True          # Use or don't use the AGC
timeout_us = int(5e6)
rx_bits = 16            # The AIR-T's ADC is 16 bits


#################################################################################
# Dual Channel Synchronization
#################################################################################
def channels_reset(sdr_obj):
    """ Resets receiver channels of the sdr object"""
    # Read curr value
    addr = 0x0005008C
    reg = sdr_obj.readRegister('FPGA', addr)
    # Set the reset bits
    rst_mask = 1 << 0
    rst_mask |= 1 << 4
    reg |= rst_mask
    # Write reg back
    sdr_obj.writeRegister('FPGA', addr, reg)


def trigger(sdr_obj):
    """ Sends trigger signal to sdr object using PCIe register"""
    # Read curr value
    addr = 0x00050040
    reg = sdr_obj.readRegister('FPGA', addr)
    # Clear the trig sel bit
    trig_sel_mask = 1 << 3
    trig_sel_mask |= 1 << 5
    reg &= ~trig_sel_mask
    # Write reg back
    sdr_obj.writeRegister('FPGA', addr, reg)


################################################################################
# Receive Signal
################################################################################
#  Initialize the AIR-T receiver using SoapyAIRT
sdr = SoapySDR.Device(dict(driver="SoapyAIRT")) # Create AIR-T instance
# Create data buffer list and start streaming samples to it
rx_buff_list = []
for chan in rx_chan_list:
    sdr.setSampleRate(SOAPY_SDR_RX, chan, fs)          # Set sample rate
    sdr.setGainMode(SOAPY_SDR_RX, chan, use_agc)       # Set the gain mode
    sdr.setFrequency(SOAPY_SDR_RX, chan, freq)         # Tune the LO
    rx_buff_list.append(np.empty(2 * N, np.int16))  # Create memory buffer
rx_stream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CS16, rx_chan_list)
channels_reset(sdr)
sdr.activateStream(rx_stream, flags=SOAPY_SDR_WAIT_TRIGGER)

# Read the samples from the data buffer
trigger(sdr)  # Trigger channels so they start synchronously
sr = sdr.readStream(rx_stream, rx_buff_list, N, timeoutUs=timeout_us)
rc = sr.ret  # number of samples read or the error code
assert rc == N, 'Error {}: {}'.format(sr.ret, errToStr(sr.ret))

# Stop streaming
sdr.deactivateStream(rx_stream)
sdr.closeStream(rx_stream)

################################################################################
# Plot Signal
################################################################################
fig, axs = plt.subplots(2, len(rx_chan_list), sharex='col')
for ax, rx_buff in zip(axs, rx_buff_list):
    # Convert interleaved shorts to np.complex64 normalized between [-1, 1]
    s0 = rx_buff.astype(float) / np.power(2.0, rx_bits-1)
    s = (s0[::2] + 1j*s0[1::2])

    # Time Domain Plot
    t_us = np.arange(N) / fs / 1e-6
    ax[0].plot(t_us, s.real, 'k', label='I')
    ax[0].plot(t_us, s.imag, 'r', label='Q')
    ax[0].set_xlim(t_us[0], t_us[-1])
    ax[0].set_xlabel('Time (us)')
    ax[0].set_ylabel('Normalized Amplitude')

    # Frequency Domain Plot
    ax[1].psd(s, len(s), Fs=fs, Fc=freq)
plt.show()

Last update: May 27, 2020