Skip to content

Transmitting on the AIR-T

In this tutorial we show you how to perform basic transmit functionality with the AIR-T using the Python interface. You will learn how to interact with the radio drivers in order to stream signal data to the radio from the Tegra. We provide the source code, and plenty of comments. The code is broken down into two main functions: make_tone and transmit_tone. Enjoy!


Creating a Tone Signal

First, we must to ensure that we are able to create a tone that is head-to-tail repeatable without any phase discontinuities. To do this, we provide a function that creates a tone segment where the frequency is an integer multiple of the sample rate. The output of the function is a numpy array that contains the interleaved complex int16 samples.

def make_tone(n, fcen, fs, phi=0.285):
    """
    Generates tone signal window with a frequency that is an integer
    multiple of the sample rate so it can be repeated without a phase
    discontinuity.
    """
    period = fs / fcen
    assert n % period == 0, 'Total samples is not an integer number of periods'
    a = 2**15
    # Make Complex Valued Tone Signal
    wt = np.array(2 * np.pi * fcen * np.arange(n) / fs)
    sig_cplx = np.exp(1j * (wt + phi))
    # Convert to interleaved int16 values
    sig_int16 = np.empty(2 * n, dtype=np.int16)
    sig_int16[0::2] = 32767 * sig_cplx.real
    sig_int16[1::2] = 32767 * sig_cplx.imag
    return sig_int16


Transmitting a Signal

Second, we transmit a signal created by the make_tone function above with SoapySDR API. The function below will continuously transmit the signal until the user exits using Ctrl-C.

def transmit_tone(freq, chan=0, fs=31.25, gain=-20, buff_len=16384):
    """ Transmit a tone out of the AIR-T """
    # Generate tone buffer that can be repeated without phase discontunity
    bb_freq = fs / 8  # baseband frequency of tone
    tx_buff = make_tone(buff_len, bb_freq, fs)
    lo_freq = freq - bb_freq  # Calc LO freq to put tone at tone_rf

    # Setup Radio
    sdr = SoapySDR.Device()  # Create AIR-T instance
    sdr.setSampleRate(SOAPY_SDR_TX, chan, fs)  # Set sample rate
    sdr.setFrequency(SOAPY_SDR_TX, chan, lo_freq)  # Tune the LO
    sdr.setGain(SOAPY_SDR_TX, chan, gain)

    tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CS16, [chan])
    sdr.activateStream(tx_stream)  # this turns the radio on

    # Transmit
    print('Now Transmitting')
    while True:
        try:
            rc = sdr.writeStream(tx_stream, [tx_buff], buff_len)
            if rc.ret != buff_len:
                print('TX Error {}: {}'.format(rc.ret, errToStr(rc.ret)))
        except KeyboardInterrupt:
            break

    # Stop streaming
    sdr.deactivateStream(tx_stream)
    sdr.closeStream(tx_stream)


Complete Code - Command Line Interface

Finally, we combine the two functions into an executable python file with command line input arguments.

#!/usr/bin/env python3

"""
Transmits a tone out of the AIR-T. The script will create a tone segment that
is infinity repeatable without a phase discontinuity and with 8 samples per
period. The TX LO of the AIR-T is set such that the baseband frequency of the
generated tone plus the LO frequency will transmit at the desired RF.
"""
import sys
import numpy as np
import argparse
import SoapySDR
from SoapySDR import SOAPY_SDR_TX, SOAPY_SDR_CS16, errToStr


def make_tone(n, fcen, fs, phi=0.285):
    """
    Generates tone signal window with a frequency that is an integer
    multiple of the sample rate so it can be repeated without a phase
    discontinuity.
    """
    period = fs / fcen
    assert n % period == 0, 'Total samples not integer number of periods'
    a = 2**15
    # Make Complex Valued Tone Signal
    wt = np.array(2 * np.pi * fcen * np.arange(n) / fs)
    sig_cplx = np.exp(1j * (wt + phi))
    # Convert to interleaved int16 values
    sig_int16 = np.empty(2 * n, dtype=np.int16)
    sig_int16[0::2] = 32767 * sig_cplx.real
    sig_int16[1::2] = 32767 * sig_cplx.imag
    return sig_int16


def transmit_tone(freq, chan=0, fs=31.25, gain=-20, buff_len=16384):
    """ Transmit a tone out of the AIR-T """
    # Generate tone buffer that can be repeated without phase discontunity
    bb_freq = fs / 8  # baseband frequency of tone
    tx_buff = make_tone(buff_len, bb_freq, fs)
    lo_freq = freq - bb_freq  # Calc LO freq to put tone at tone_rf

    # Setup Radio
    sdr = SoapySDR.Device()  # Create AIR-T instance
    sdr.setSampleRate(SOAPY_SDR_TX, chan, fs)  # Set sample rate
    sdr.setFrequency(SOAPY_SDR_TX, chan, lo_freq)  # Tune the LO
    sdr.setGain(SOAPY_SDR_TX, chan, gain)

    tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CS16, [chan])
    sdr.activateStream(tx_stream)  # this turns the radio on

    # Transmit
    print('Now Transmitting')
    while True:
        try:
            rc = sdr.writeStream(tx_stream, [tx_buff], buff_len)
            if rc.ret != buff_len:
                print('TX Error {}: {}'.format(rc.ret, errToStr(rc.ret)))
        except KeyboardInterrupt:
            break

    # Stop streaming
    sdr.deactivateStream(tx_stream)
    sdr.closeStream(tx_stream)


def parse_command_line_arguments():
    """ Create command line options for transmit function """
    help_formatter = argparse.ArgumentDefaultsHelpFormatter
    parser = argparse.ArgumentParser(description='Transmit a tone on the AIR-T',
                                     formatter_class=help_formatter)
    parser.add_argument('-f', type=float, required=False, dest='freq',
                        default=2400e6, help='TX Tone Frequency')
    parser.add_argument('-c', type=int, required=False, dest='chan',
                        default=0, help='TX Channel Number [0 or 1]')
    parser.add_argument('-s', type=float, required=False, dest='fs',
                        default=31.25e6, help='TX Sample Rate')
    parser.add_argument('-g', type=float, required=False, dest='gain',
                        default=0, help='TX gain')
    parser.add_argument('-n', type=int, required=False, dest='buff_len',
                        default=16384, help='TX Buffer Size')
    return parser.parse_args(sys.argv[1:])


if __name__ == '__main__':
    pars = parse_command_line_arguments()
    transmit_tone(pars.freq, pars.chan, pars.fs, pars.gain, pars.buff_len)


This function may be called from the command line of the AIR-T to transmit a tone. By default, it will transmit a tone at 2.4 GHz, but we also provide a help menu as follows to change the signal parameters.


$ ./transmit_tone.py -h
usage: transmit_tone.py [-h] [-f FREQ] [-c CHAN] [-s FS] [-g GAIN]
                        [-n BUFF_LEN]

Transmit a tone from the AIR-T

optional arguments:
  -h, --help   show this help message and exit
  -f FREQ      TX Tone Frequency (default: 2400000000.0)
  -c CHAN      TX Channel Number [0 or 1] (default: 0)
  -s FS        TX Sample Rate (default: 31250000.0)
  -g GAIN      TX gain (default: 0)
  -n BUFF_LEN  TX Buffer Size (default: 16384)

Asynchronous Transmit

As of AirStack 1.0, the AIR-T has the capability to transmit buffers asynchronously. That is, a buffer can be set to start at a particular time or to start once an external trigger signal is detected. For timed transmissions, please see the appropriate Tutorial. The following describes how to handle buffers controlled via an external trigger signal and how to work with the new asynchronous TX API.

Before continuing, it is important to familiarize yourself with the concepts of an externally triggered RX stream, which are discussed in a previous Tutorial. The primary difference for TX is how buffers/streams relate to the triggering mechanism. That is, for RX, an entire stream is triggered, where the stream starts once the triggering event occurs and samples are continuously read from the device from that point forward. This is why for RX, activateStream() is where the SOAPY_SDR_WAIT_TRIGGER flag is passed in. For TX, each buffer is controlled individually. That is, once a TX stream is activated, each call to writeStream() controls whether the provided buffer of samples is to start at a particular time, go out immediately, or to wait for an external trigger. As a result, for TX, the SOAPY_SDR_WAIT_TRIGGER flag is provided after the stream is activated, in writeStream().

Queuing the Samples

With asynchronous TX, the meaning of writeStream() changes dramatically. In the synchronous case, writeStream() returns the number of samples that have been transmitted. For asynchronous transmissions, writeStream() will return the number of samples that have been queued and are awaiting some event (i.e., a time or external trigger signal) in order to transmit. Note that in AirStack 1.0, only one transmit buffer can be queued at a given time, and an error will be returned if the transmit queue is full.

The following call to writeStream() shows how to queue a buffer to be transmit once an external trigger signal is detected. Note that the only change is to provide the SOAPY_SDR_WAIT_TRIGGER flag.

# rc.ret contains either the number of samples queued (and waiting for the
# external trigger signal) or an error code
rc = sdr.writeStream(tx_stream, [tx_buff], buff_len, flags=SOAPY_SDR_WAIT_TRIGGER)

Retrieving Status

Each successful call to writeStream() in asynchronous mode (either timed or using an external trigger source) must be followed by call(s) to readStreamStatus() in order to retrieve the status of the channel(s) in the stream. readStreamStatus() will return a status for a particular channel and, as a result, needs to be called multiple times in the case of a multiple channel stream. That is, each channel's status is reported individually, and therefore multiple calls are required for multiple channel streams. See the example code below for details.

max_chans = sdr.getNumChannels(SOAPY_SDR_TX)
results_arr = [None] * max_chans
results_pending = True
while results_pending:
    # Get first available result, if it's available within timeoutUs
    tx_result = sdr.readStreamStatus(tx_stream, timeoutUs=10000000)
    # If the return code is SOAPY_SDR_TIMEOUT, the trigger has not happened yet,
    # so the loop just tries again.
    if tx_result.ret != SOAPY_SDR_TIMEOUT:  # got some result, save it!
        # Determine what channel(s) this result applies to. The code below is
        # generic, but on the AIR-T (as of AirStack 1.0), we report a result for
        # each channel in the stream. For example, for a two channel stream, we
        # will report two results and need to call readStreamStatus() twice. As
        # a result, on the AIR-T, chanMask should only have one bit set since
        # each result only corresponds to a single channel.
        for i in range(max_chans):
            if tx_result.chanMask & (1 << i):
                # chanMask reported that this status applies to channel i
                if results_arr[i] is None:
                    results_arr[i] = tx_result
                else:
                    raise RuntimeError(f"Got multiple results for channel {i}!")

        # Check to see if all channels in this stream have reported a result
        results_pending = False
        # channel_indexes is the list of channels being used by tx_stream. That
        # is, it is the list of channels that was passed in to setupStream() to
        # create tx_stream.
        for chan in channel_indexes:
            if results_arr[chan] is None:
                results_pending = True

Fixed Delay Calibration

Each TX channel can be configured to delay starting samples by a fixed number of clock ticks. This cal_delay value can be used to align the trigger position with samples stream start time. The default and minimum value is 0 and maximum is 255.

cal_delay = 0   # Calibration delay to align trigger position
# Write calibration delay register
cal_reg_addrs = [0x000500B8, 0x000500BC] # Control registers for each channel
sdr.writeRegister('FPGA', cal_reg_addrs[tx_chan], cal_delay)


Last update: February 14, 2024