Transmitting on the AIR-T¶
In this tutorial we show you how to perform basic transmit functionality with the AIR-T using the Python interface. You will learn how to interact with the radio drivers in order to stream signal data to the radio from the Tegra. We provide the source code, and plenty of comments. The code is broken down into two main functions: make_tone and transmit_tone. Enjoy!
Creating a Tone Signal¶
First, we must to ensure that we are able to create a tone that is head-to-tail repeatable without any phase discontinuities. To do this, we provide a function that creates a tone segment where the frequency is an integer multiple of the sample rate. The output of the function is a numpy array that contains the interleaved complex int16 samples.
def make_tone(n, fcen, fs, phi=0.285):
"""
Generates tone signal window with a frequency that is an integer
multiple of the sample rate so it can be repeated without a phase
discontinuity.
"""
period = fs / fcen
assert n % period == 0, 'Total samples is not an integer number of periods'
a = 2**15
# Make Complex Valued Tone Signal
wt = np.array(2 * np.pi * fcen * np.arange(n) / fs)
sig_cplx = np.exp(1j * (wt + phi))
# Convert to interleaved int16 values
sig_int16 = np.empty(2 * n, dtype=np.int16)
sig_int16[0::2] = 32767 * sig_cplx.real
sig_int16[1::2] = 32767 * sig_cplx.imag
return sig_int16
Transmitting a Signal¶
Second, we transmit a signal created by the make_tone
function above with SoapySDR API. The function below will continuously transmit the signal until the user exits using Ctrl-C
.
def transmit_tone(freq, chan=0, fs=31.25, gain=-20, buff_len=16384):
""" Transmit a tone out of the AIR-T """
# Generate tone buffer that can be repeated without phase discontunity
bb_freq = fs / 8 # baseband frequency of tone
tx_buff = make_tone(buff_len, bb_freq, fs)
lo_freq = freq - bb_freq # Calc LO freq to put tone at tone_rf
# Setup Radio
sdr = SoapySDR.Device() # Create AIR-T instance
sdr.setSampleRate(SOAPY_SDR_TX, chan, fs) # Set sample rate
sdr.setFrequency(SOAPY_SDR_TX, chan, lo_freq) # Tune the LO
sdr.setGain(SOAPY_SDR_TX, chan, gain)
tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CS16, [chan])
sdr.activateStream(tx_stream) # this turns the radio on
# Transmit
print('Now Transmitting')
while True:
try:
rc = sdr.writeStream(tx_stream, [tx_buff], buff_len)
if rc.ret != buff_len:
print('TX Error {}: {}'.format(rc.ret, errToStr(rc.ret)))
except KeyboardInterrupt:
break
# Stop streaming
sdr.deactivateStream(tx_stream)
sdr.closeStream(tx_stream)
Complete Code - Command Line Interface¶
Finally, we combine the two functions into an executable python file with command line input arguments.
#!/usr/bin/env python3
"""
Transmits a tone out of the AIR-T. The script will create a tone segment that
is infinity repeatable without a phase discontinuity and with 8 samples per
period. The TX LO of the AIR-T is set such that the baseband frequency of the
generated tone plus the LO frequency will transmit at the desired RF.
"""
import sys
import numpy as np
import argparse
import SoapySDR
from SoapySDR import SOAPY_SDR_TX, SOAPY_SDR_CS16, errToStr
def make_tone(n, fcen, fs, phi=0.285):
"""
Generates tone signal window with a frequency that is an integer
multiple of the sample rate so it can be repeated without a phase
discontinuity.
"""
period = fs / fcen
assert n % period == 0, 'Total samples not integer number of periods'
a = 2**15
# Make Complex Valued Tone Signal
wt = np.array(2 * np.pi * fcen * np.arange(n) / fs)
sig_cplx = np.exp(1j * (wt + phi))
# Convert to interleaved int16 values
sig_int16 = np.empty(2 * n, dtype=np.int16)
sig_int16[0::2] = 32767 * sig_cplx.real
sig_int16[1::2] = 32767 * sig_cplx.imag
return sig_int16
def transmit_tone(freq, chan=0, fs=31.25, gain=-20, buff_len=16384):
""" Transmit a tone out of the AIR-T """
# Generate tone buffer that can be repeated without phase discontunity
bb_freq = fs / 8 # baseband frequency of tone
tx_buff = make_tone(buff_len, bb_freq, fs)
lo_freq = freq - bb_freq # Calc LO freq to put tone at tone_rf
# Setup Radio
sdr = SoapySDR.Device() # Create AIR-T instance
sdr.setSampleRate(SOAPY_SDR_TX, chan, fs) # Set sample rate
sdr.setFrequency(SOAPY_SDR_TX, chan, lo_freq) # Tune the LO
sdr.setGain(SOAPY_SDR_TX, chan, gain)
tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CS16, [chan])
sdr.activateStream(tx_stream) # this turns the radio on
# Transmit
print('Now Transmitting')
while True:
try:
rc = sdr.writeStream(tx_stream, [tx_buff], buff_len)
if rc.ret != buff_len:
print('TX Error {}: {}'.format(rc.ret, errToStr(rc.ret)))
except KeyboardInterrupt:
break
# Stop streaming
sdr.deactivateStream(tx_stream)
sdr.closeStream(tx_stream)
def parse_command_line_arguments():
""" Create command line options for transmit function """
help_formatter = argparse.ArgumentDefaultsHelpFormatter
parser = argparse.ArgumentParser(description='Transmit a tone on the AIR-T',
formatter_class=help_formatter)
parser.add_argument('-f', type=float, required=False, dest='freq',
default=2400e6, help='TX Tone Frequency')
parser.add_argument('-c', type=int, required=False, dest='chan',
default=0, help='TX Channel Number [0 or 1]')
parser.add_argument('-s', type=float, required=False, dest='fs',
default=31.25e6, help='TX Sample Rate')
parser.add_argument('-g', type=float, required=False, dest='gain',
default=0, help='TX gain')
parser.add_argument('-n', type=int, required=False, dest='buff_len',
default=16384, help='TX Buffer Size')
return parser.parse_args(sys.argv[1:])
if __name__ == '__main__':
pars = parse_command_line_arguments()
transmit_tone(pars.freq, pars.chan, pars.fs, pars.gain, pars.buff_len)
This function may be called from the command line of the AIR-T to transmit a tone. By default, it will transmit a tone at 2.4 GHz, but we also provide a help menu as follows to change the signal parameters.
$ ./transmit_tone.py -h
usage: transmit_tone.py [-h] [-f FREQ] [-c CHAN] [-s FS] [-g GAIN]
[-n BUFF_LEN]
Transmit a tone from the AIR-T
optional arguments:
-h, --help show this help message and exit
-f FREQ TX Tone Frequency (default: 2400000000.0)
-c CHAN TX Channel Number [0 or 1] (default: 0)
-s FS TX Sample Rate (default: 31250000.0)
-g GAIN TX gain (default: 0)
-n BUFF_LEN TX Buffer Size (default: 16384)
Asynchronous Transmit¶
As of AirStack 1.0, the AIR-T has the capability to transmit buffers asynchronously. That is, a buffer can be set to start at a particular time or to start once an external trigger signal is detected. For timed transmissions, please see the appropriate Tutorial. The following describes how to handle buffers controlled via an external trigger signal and how to work with the new asynchronous TX API.
Before continuing, it is important to familiarize yourself with the concepts of an externally triggered RX stream, which are discussed in a previous Tutorial. The primary difference for TX is how buffers/streams relate to the triggering mechanism. That is, for RX, an entire stream is triggered, where the stream starts once the triggering event occurs and samples are continuously read from the device from that point forward. This is why for RX, activateStream()
is where the SOAPY_SDR_WAIT_TRIGGER
flag is passed in. For TX, each buffer is controlled individually. That is, once a TX stream is activated, each call to writeStream()
controls whether the provided buffer of samples is to start at a particular time, go out immediately, or to wait for an external trigger. As a result, for TX, the SOAPY_SDR_WAIT_TRIGGER
flag is provided after the stream is activated, in writeStream()
.
Queuing the Samples¶
With asynchronous TX, the meaning of writeStream()
changes dramatically. In the synchronous case, writeStream()
returns the number of samples that have been transmitted. For asynchronous transmissions, writeStream()
will return the number of samples that have been queued and are awaiting some event (i.e., a time or external trigger signal) in order to transmit. Note that in AirStack 1.0, only one transmit buffer can be queued at a given time, and an error will be returned if the transmit queue is full.
The following call to writeStream()
shows how to queue a buffer to be transmit once an external trigger signal is detected. Note that the only change is to provide the SOAPY_SDR_WAIT_TRIGGER
flag.
# rc.ret contains either the number of samples queued (and waiting for the
# external trigger signal) or an error code
rc = sdr.writeStream(tx_stream, [tx_buff], buff_len, flags=SOAPY_SDR_WAIT_TRIGGER)
Retrieving Status¶
Each successful call to writeStream()
in asynchronous mode (either timed or using an external trigger source) must be followed by call(s) to readStreamStatus()
in order to retrieve the status of the channel(s) in the stream. readStreamStatus()
will return a status for a particular channel and, as a result, needs to be called multiple times in the case of a multiple channel stream. That is, each channel's status is reported individually, and therefore multiple calls are required for multiple channel streams. See the example code below for details.
max_chans = sdr.getNumChannels(SOAPY_SDR_TX)
results_arr = [None] * max_chans
results_pending = True
while results_pending:
# Get first available result, if it's available within timeoutUs
tx_result = sdr.readStreamStatus(tx_stream, timeoutUs=10000000)
# If the return code is SOAPY_SDR_TIMEOUT, the trigger has not happened yet,
# so the loop just tries again.
if tx_result.ret != SOAPY_SDR_TIMEOUT: # got some result, save it!
# Determine what channel(s) this result applies to. The code below is
# generic, but on the AIR-T (as of AirStack 1.0), we report a result for
# each channel in the stream. For example, for a two channel stream, we
# will report two results and need to call readStreamStatus() twice. As
# a result, on the AIR-T, chanMask should only have one bit set since
# each result only corresponds to a single channel.
for i in range(max_chans):
if tx_result.chanMask & (1 << i):
# chanMask reported that this status applies to channel i
if results_arr[i] is None:
results_arr[i] = tx_result
else:
raise RuntimeError(f"Got multiple results for channel {i}!")
# Check to see if all channels in this stream have reported a result
results_pending = False
# channel_indexes is the list of channels being used by tx_stream. That
# is, it is the list of channels that was passed in to setupStream() to
# create tx_stream.
for chan in channel_indexes:
if results_arr[chan] is None:
results_pending = True
Fixed Delay Calibration¶
Each TX channel can be configured to delay starting samples by a fixed number of clock ticks. This cal_delay
value can be used to align the trigger position with samples stream start time. The default and minimum value is 0
and maximum is 255
.
cal_delay = 0 # Calibration delay to align trigger position
# Write calibration delay register
cal_reg_addrs = [0x000500B8, 0x000500BC] # Control registers for each channel
sdr.writeRegister('FPGA', cal_reg_addrs[tx_chan], cal_delay)