Transmitting and Decoding APRS Packets with ADALM PlutoSDR on Mac OS using Python


            

This post documents an experimental, complete, journey of building an AFSK APRS packet transmitter and receiver using the ADALM PlutoSDR software-defined radio, first in MATLAB and then in Python. I tried to cover everything from generating AX.25 frames, AFSK modulation, hardware connection troubleshooting on macOS, and finally a full packet decoder.

What is APRS?

APRS (Automatic Packet Reporting System) is a digital communications protocol used in amateur radio. It transmits short packets containing position reports, messages, weather data, and more. It uses AFSK modulation (Audio Frequency Shift Keying) at 1200 baud, with:

  • Mark frequency: 1200 Hz (represents bit 1)
  • Space frequency: 2200 Hz (represents bit 0)
  • Carrier frequency: 144.800 MHz in Europe, 144.390 MHz in North America

The underlying framing protocol is AX.25, which handles callsigns, addressing, CRC error checking, NRZI encoding, and bit stuffing.

Hardware and Software Requirements

  • ADALM PlutoSDR (with a data-capable USB cable)
  • A 2-meter amateur radio antenna
  • An amateur radio licence to transmit on 144 MHz
  • MATLAB with Communications Toolbox and the ADALM-Pluto Radio Support Package, or Python with pyadi-iio, numpy, and scipy

Part 1: MATLAB Implementation

How the MATLAB Transmitter Works

  • Build an AX.25 UI frame with callsign encoding, CRC-16, and bit stuffing
  • Apply NRZI encoding
  • Modulate using AFSK at 48 kHz internally
  • Upsample to 576 kHz to meet the PlutoSDR minimum sample rate
  • Transmit via the PlutoSDR at 144.800 MHz

MATLAB Transmitter Code

%% AFSK APRS Packet Generator and Transmitter via ADALM PlutoSDR
clear; clc;

%% ===================== APRS / AX.25 PARAMETERS =====================
CALLSIGN_SRC = 'NOCALL';
SSID_SRC     = 0;
CALLSIGN_DST = 'APRS';
SSID_DST     = 0;

aprs_info = '!4807.38N/01131.00E>Test APRS via PlutoSDR /A=000000';

%% ===================== AFSK PARAMETERS =====================
Fs_audio        = 48000;
Fs_pluto        = 576000;
mark_f          = 1200;
space_f         = 2200;
baud_rate       = 1200;
samples_per_bit = Fs_audio / baud_rate;
Fc_TX           = 144.800e6;

%% ===================== BUILD AX.25 FRAME =====================
frame_bits = build_ax25_frame(CALLSIGN_DST, SSID_DST, CALLSIGN_SRC, SSID_SRC, aprs_info);

%% ===================== NRZI ENCODING =====================
nrzi_bits = nrzi_encode(frame_bits);

%% ===================== AFSK MODULATION @ 48 kHz =====================
audio_signal = afsk_modulate(nrzi_bits, mark_f, space_f, Fs_audio, samples_per_bit);

%% ===================== PREAMBLE + TAIL =====================
flag_bits  = repmat([0 1 1 1 1 1 1 0], 1, 30);
tail_bits  = repmat([0 1 1 1 1 1 1 0], 1, 5);
pre_audio  = afsk_modulate(nrzi_encode(flag_bits), mark_f, space_f, Fs_audio, samples_per_bit);
tail_audio = afsk_modulate(nrzi_encode(tail_bits), mark_f, space_f, Fs_audio, samples_per_bit);

full_audio = [pre_audio; audio_signal; tail_audio];

%% ===================== UPSAMPLE to Pluto Sample Rate =====================
[P, Q]     = rat(Fs_pluto / Fs_audio);
full_audio = resample(full_audio, P, Q);
full_audio = full_audio / max(abs(full_audio));

%% ===================== COMPLEX BASEBAND =====================
tx_signal = complex(full_audio, zeros(size(full_audio)));

%% ===================== PLUTO TX =====================
tx = sdrtx('Pluto', ...
    'RadioID',            'ip:192.168.2.1', ...
    'CenterFrequency',    Fc_TX, ...
    'BasebandSampleRate', Fs_pluto, ...
    'Gain',               -10);

fprintf('Transmitting on %.3f MHz...\n', Fc_TX/1e6);

for k = 1:3
    transmitRepeat(tx, tx_signal);
    pause(2);
end

release(tx);
fprintf('Done.\n');

%% ================================================================
%%                        HELPER FUNCTIONS
%% ================================================================

function bits = build_ax25_frame(dst_call, dst_ssid, src_call, src_ssid, info)
    dst_bytes  = encode_ax25_addr(dst_call, dst_ssid, false);
    src_bytes  = encode_ax25_addr(src_call, src_ssid, true);
    control    = uint8(3);
    pid        = uint8(240);
    info_bytes = uint8(info);
    frame_data = [dst_bytes, src_bytes, control, pid, info_bytes];
    crc_val    = compute_crc16(frame_data);
    crc_lo     = uint8(bitand(crc_val, uint16(255)));
    crc_hi     = uint8(bitshift(crc_val, -8));
    full_frame = [frame_data, crc_lo, crc_hi];
    bits       = bytes_to_bits_lsb(full_frame);
    bits       = bit_stuff(bits);
end

function addr_bytes = encode_ax25_addr(callsign, ssid, is_last)
    callsign = upper(callsign);
    while length(callsign) < 6
        callsign = [callsign ' '];
    end
    addr_bytes = uint8(callsign(1:6)) * uint8(2);
    ssid_byte  = bitor(uint8(96), uint8(ssid * 2));
    if is_last
        ssid_byte = bitor(ssid_byte, uint8(1));
    end
    addr_bytes = [addr_bytes, ssid_byte];
end

function crc = compute_crc16(data)
    crc  = uint16(65535);
    poly = uint16(33800);
    for i = 1:length(data)
        b   = uint16(data(i));
        crc = bitxor(crc, b);
        for j = 1:8
            if bitand(crc, uint16(1))
                crc = bitxor(bitshift(crc, -1), poly);
            else
                crc = bitshift(crc, -1);
            end
        end
    end
    crc = bitxor(crc, uint16(65535));
end

function bits = bytes_to_bits_lsb(bytes)
    n    = length(bytes);
    bits = zeros(1, n * 8, 'uint8');
    for i = 1:n
        b = bytes(i);
        for k = 0:7
            bits((i-1)*8 + k + 1) = uint8(bitand(b, bitshift(uint8(1), k)) > 0);
        end
    end
    bits = double(bits);
end

function stuffed = bit_stuff(bits)
    stuffed = zeros(1, length(bits) + floor(length(bits)/5), 'double');
    idx     = 0;
    count   = 0;
    for i = 1:length(bits)
        idx          = idx + 1;
        stuffed(idx) = bits(i);
        if bits(i) == 1
            count = count + 1;
            if count == 5
                idx          = idx + 1;
                stuffed(idx) = 0;
                count        = 0;
            end
        else
            count = 0;
        end
    end
    stuffed = stuffed(1:idx);
end

function nrzi = nrzi_encode(bits)
    nrzi    = zeros(1, length(bits));
    current = 1;
    for i = 1:length(bits)
        if bits(i) == 0
            current = 1 - current;
        end
        nrzi(i) = current;
    end
end

function audio = afsk_modulate(bits, f_mark, f_space, Fs, spb)
    spb   = round(spb);
    n     = length(bits);
    audio = zeros(n * spb, 1);
    phase = 0;
    for i = 1:n
        freq  = f_mark * bits(i) + f_space * (1 - bits(i));
        t     = (0:spb-1).' / Fs;
        chunk = sin(2*pi*freq*t + phase);
        audio((i-1)*spb + (1:spb)) = chunk;
        phase = mod(phase + 2*pi*freq*spb/Fs, 2*pi);
    end
end

MATLAB Troubleshooting Notes

  • bitand type errors: MATLAB requires all arguments to bitand, bitor, bitxor, and bitshift to be the same integer type. Never mix uint8, uint16, or uint32 with plain double literals like 0xFF. Use explicit casts such as uint16(65535) instead.
  • BasebandSampleRate too low: The PlutoSDR requires a minimum sample rate of 65,104 Hz via raw libiio, and 521,000 Hz via pyadi-iio. Generate audio at 48 kHz internally, then upsample to 576 kHz (an exact 12x integer ratio) before passing to the hardware.
  • RadioID not found: On macOS, do not use the /dev/tty.usbmodem device for SDR data. That port is for serial console access only. MATLAB communicates with the PlutoSDR over IP via the USB virtual network adapter. Use RadioID ‘ip:192.168.2.1’.

Part 2: Connecting the PlutoSDR on macOS

The PlutoSDR creates two interfaces when plugged into a Mac via USB:

  • /dev/tty.usbmodem* — serial console for shell access (not used for SDR data)
  • Virtual USB network adapter (ECM/RNDIS) — used by MATLAB and Python for IQ data, always at IP 192.168.2.1

Verifying the Connection in Terminal

ping -c 4 192.168.2.1
ifconfig | grep -B5 “192.168.2”
system_profiler SPUSBDataType | grep -A 10 “0456”

If the interface exists but shows inactive, the USB cable may be charge-only. Replace it with a data-capable cable. If the interface is active but has no IP, assign one manually:

sudo ifconfig en5 192.168.2.10 netmask 255.255.255.0
ping 192.168.2.1

Once ping responds, the PlutoSDR is reachable and both MATLAB and Python will connect successfully.

Part 3: Python Implementation

Installation

pip install pyadi-iio numpy scipy

Quick Connection Test

import adi

candidates = ['ip:192.168.2.1', 'ip:pluto.local', 'usb:0']

for uri in candidates:
    try:
        sdr = adi.Pluto(uri=uri)
        print(f'Connected via {uri}')
        print(f'   Sample rate:  {sdr.sample_rate} Hz')
        print(f'   TX LO:        {sdr.tx_lo} Hz')
        print(f'   RX LO:        {sdr.rx_lo} Hz')
        break
    except Exception as e:
        print(f'Failed {uri}: {e}')

Python Transmitter Code

#!/usr/bin/env python3
"""
AFSK APRS Packet Generator and Transmitter via ADALM PlutoSDR
Requires: pyadi-iio, numpy, scipy
"""

import numpy as np
from scipy.signal import resample_poly
import adi
import time

# ===================== APRS / AX.25 PARAMETERS =====================
CALLSIGN_SRC = 'NOCALL'
SSID_SRC     = 0
CALLSIGN_DST = 'APRS'
SSID_DST     = 0
APRS_INFO    = '!4807.38N/01131.00E>Test APRS via PlutoSDR /A=000000'

# ===================== SAMPLE RATE PARAMETERS =====================
FS_AUDIO  = 48000
FS_PLUTO  = 576_000

# ===================== AFSK PARAMETERS =====================
MARK_FREQ       = 1200
SPACE_FREQ      = 2200
BAUD_RATE       = 1200
SAMPLES_PER_BIT = FS_AUDIO // BAUD_RATE
FC_TX           = 144_800_000
TX_GAIN         = -10

def encode_ax25_addr(callsign, ssid, is_last):
    callsign  = callsign.upper().ljust(6)[:6]
    addr      = [ord(c) << 1 for c in callsign]
    ssid_byte = 0x60 | ((ssid & 0x0F) << 1)
    if is_last:
        ssid_byte |= 0x01
    addr.append(ssid_byte)
    return addr

def compute_crc16(data):
    crc  = 0xFFFF
    poly = 0x8408
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x0001:
                crc = (crc >> 1) ^ poly
            else:
                crc >>= 1
    return crc ^ 0xFFFF

def bytes_to_bits_lsb(data):
    bits = []
    for byte in data:
        for k in range(8):
            bits.append((byte >> k) & 1)
    return bits

def bit_stuff(bits):
    stuffed = []
    count   = 0
    for b in bits:
        stuffed.append(b)
        if b == 1:
            count += 1
            if count == 5:
                stuffed.append(0)
                count = 0
        else:
            count = 0
    return stuffed

def nrzi_encode(bits):
    nrzi    = []
    current = 1
    for b in bits:
        if b == 0:
            current ^= 1
        nrzi.append(current)
    return nrzi

def build_ax25_frame(dst_call, dst_ssid, src_call, src_ssid, info):
    dst_bytes  = encode_ax25_addr(dst_call, dst_ssid, False)
    src_bytes  = encode_ax25_addr(src_call, src_ssid, True)
    control    = [0x03]
    pid        = [0xF0]
    info_bytes = list(info.encode('ascii'))
    frame_data = dst_bytes + src_bytes + control + pid + info_bytes
    crc        = compute_crc16(frame_data)
    full_frame = frame_data + [crc & 0xFF, (crc >> 8) & 0xFF]
    bits = bytes_to_bits_lsb(full_frame)
    bits = bit_stuff(bits)
    return bits

def afsk_modulate(bits, f_mark, f_space, fs, spb):
    spb   = int(round(spb))
    audio = np.zeros(len(bits) * spb, dtype=np.float64)
    phase = 0.0
    t     = np.arange(spb) / fs
    for i, bit in enumerate(bits):
        freq  = f_mark if bit == 1 else f_space
        chunk = np.sin(2 * np.pi * freq * t + phase)
        audio[i*spb:(i+1)*spb] = chunk
        phase = (phase + 2 * np.pi * freq * spb / fs) % (2 * np.pi)
    return audio

def upsample_to_pluto(audio, fs_in, fs_out):
    from math import gcd
    g    = gcd(fs_out, fs_in)
    up   = fs_out // g
    down = fs_in  // g
    print(f'  Resampling {fs_in} Hz -> {fs_out} Hz  (P={up}, Q={down})')
    return resample_poly(audio, up, down)

def detect_pluto(candidates=None):
    if candidates is None:
        candidates = ['ip:192.168.2.1', 'ip:pluto.local', 'usb:0', 'usb:1']
    for uri in candidates:
        try:
            print(f'  Trying {uri} ...', end=' ', flush=True)
            sdr = adi.Pluto(uri=uri)
            print('Connected!')
            return sdr, uri
        except Exception as e:
            print(f'Failed: {e}')
    raise RuntimeError('Could not find PlutoSDR. Check USB cable and run: ping 192.168.2.1')

def main():
    print('=== AFSK APRS Transmitter - ADALM PlutoSDR ===\n')

    print('Building AX.25 frame...')
    frame_bits = build_ax25_frame(CALLSIGN_DST, SSID_DST, CALLSIGN_SRC, SSID_SRC, APRS_INFO)
    nrzi_bits  = nrzi_encode(frame_bits)
    print(f'  {len(frame_bits)} bits after stuffing')

    print('Modulating AFSK...')
    flag_bits  = [0,1,1,1,1,1,1,0] * 30
    tail_bits  = [0,1,1,1,1,1,1,0] * 5
    pre_audio  = afsk_modulate(nrzi_encode(flag_bits),  MARK_FREQ, SPACE_FREQ, FS_AUDIO, SAMPLES_PER_BIT)
    main_audio = afsk_modulate(nrzi_bits,                MARK_FREQ, SPACE_FREQ, FS_AUDIO, SAMPLES_PER_BIT)
    tail_audio = afsk_modulate(nrzi_encode(tail_bits),  MARK_FREQ, SPACE_FREQ, FS_AUDIO, SAMPLES_PER_BIT)

    full_audio = np.concatenate([pre_audio, main_audio, tail_audio])
    full_audio = upsample_to_pluto(full_audio, FS_AUDIO, FS_PLUTO)
    full_audio /= np.max(np.abs(full_audio))

    scale     = 2**14
    iq_signal = (full_audio * scale).astype(np.complex64)

    print('\nSearching for PlutoSDR...')
    sdr, uri = detect_pluto()

    print(f'\nConfiguring TX on {FC_TX/1e6:.3f} MHz...')
    sdr.sample_rate           = int(FS_PLUTO)
    sdr.tx_lo                 = int(FC_TX)
    sdr.tx_rf_bandwidth       = 200_000
    sdr.tx_hardwaregain_chan0 = int(TX_GAIN)
    sdr.tx_cyclic_buffer      = True

    print('Transmitting...')
    tx_duration = len(iq_signal) / FS_PLUTO
    num_repeats = 3
    total_time  = tx_duration * num_repeats

    sdr.tx(iq_signal)
    print(f'  Cycling for {total_time:.1f} s ({num_repeats} passes)...')
    time.sleep(total_time)

    sdr.tx_destroy_buffer()
    print('\nDone.')

if __name__ == '__main__':
    main()

Python Troubleshooting Notes

  • Sample rate below 521e3: The pyadi-iio library enforces a minimum of 521,000 Hz. Use 576,000 Hz which is exactly 12 times 48,000 Hz — a clean integer ratio that avoids resampling artifacts.
  • TX cyclic buffer error: When tx_cyclic_buffer is True, call sdr.tx() only once. The buffer loops automatically. Control transmission duration with time.sleep(), then call sdr.tx_destroy_buffer() when done. Calling sdr.tx() a second time in cyclic mode raises an exception.
  • PLL rounding: The AD9363 chip cannot hit exact frequencies. A requested rate of 576,000 Hz may read back as 575,999 Hz, and 144.800 MHz may read back as 144,799,998 Hz. This is normal and has no practical effect on APRS decoding.

Part 4: Python APRS Decoder

Decoder Pipeline

  • Capture IQ at 576 kHz from the PlutoSDR RX port
  • Downsample to 48 kHz
  • FM discriminator to extract instantaneous frequency
  • Bandpass filter from 1000 to 2400 Hz
  • Dual-tone energy detection comparing 1200 Hz and 2200 Hz envelopes
  • Bit clock recovery using zero-crossing synchronisation
  • NRZI decode, bit destuffing, flag search
  • CRC-16 verification
  • AX.25 frame decode (callsign, SSID, digipeaters)
  • APRS info field parse (position, message, status, weather, object)

Python Decoder Code

#!/usr/bin/env python3
"""
AFSK APRS Packet Decoder via ADALM PlutoSDR
Requires: pyadi-iio, numpy, scipy
"""

import numpy as np
from scipy.signal import resample_poly, firwin, lfilter, hilbert
from scipy.signal import butter, sosfilt
import adi
import time
from datetime import datetime

FC_RX           = 144_800_000
FS_PLUTO        = 576_000
FS_AUDIO        = 48_000
MARK_FREQ       = 1200
SPACE_FREQ      = 2200
BAUD_RATE       = 1200
SAMPLES_PER_BIT = FS_AUDIO // BAUD_RATE
RX_BUFFER_SIZE  = 576_000 * 2

def downsample(samples, fs_in, fs_out):
    from math import gcd
    g    = gcd(fs_in, fs_out)
    up   = fs_out // g
    down = fs_in  // g
    return resample_poly(samples, up, down)

def fm_demodulate(iq):
    conj_product = iq[1:] * np.conj(iq[:-1])
    return np.angle(conj_product)

def bandpass_filter(signal, f_low, f_high, fs, order=6):
    nyq = fs / 2.0
    sos = butter(order, [f_low/nyq, f_high/nyq], btype='band', output='sos')
    return sosfilt(sos, signal)

def tone_filter(signal, freq, fs, bw=200.0):
    return bandpass_filter(signal, freq - bw/2, freq + bw/2, fs)

def afsk_demodulate(audio, fs, f_mark, f_space):
    mark_filtered  = tone_filter(audio, f_mark,  fs, bw=300)
    space_filtered = tone_filter(audio, f_space, fs, bw=300)
    mark_env  = np.abs(hilbert(mark_filtered))
    space_env = np.abs(hilbert(space_filtered))
    diff = mark_env - space_env
    spb     = fs // BAUD_RATE
    lp_taps = firwin(spb + 1, BAUD_RATE / (fs / 2))
    diff    = lfilter(lp_taps, 1.0, diff)
    return diff

def recover_bits(soft_bits, fs, baud):
    spb         = fs / baud
    transitions = np.where(np.diff(np.sign(soft_bits)))[0]
    if len(transitions) == 0:
        return []
    phase = transitions[0] % spb
    n     = len(soft_bits)
    bits  = []
    pos   = phase + spb / 2
    while pos < n:
        idx = int(round(pos))
        if idx < n:
            bits.append(1 if soft_bits[idx] > 0 else 0)
        pos += spb
    return bits

def nrzi_decode(bits):
    if not bits:
        return []
    decoded = []
    prev    = bits[0]
    for b in bits[1:]:
        decoded.append(0 if b != prev else 1)
        prev = b
    return decoded

def bit_destuff(bits):
    destuffed = []
    count     = 0
    i         = 0
    while i < len(bits):
        b = bits[i]
        destuffed.append(b)
        if b == 1:
            count += 1
            if count == 5:
                i     += 1
                count  = 0
        else:
            count = 0
        i += 1
    return destuffed

def bits_to_bytes(bits):
    data = []
    for i in range(0, len(bits) - 7, 8):
        byte = 0
        for k in range(8):
            byte |= bits[i+k] << k
        data.append(byte)
    return data

def verify_crc(frame_bytes):
    if len(frame_bytes) < 3:
        return False
    data   = frame_bytes[:-2]
    crc_rx = frame_bytes[-2] | (frame_bytes[-1] << 8)
    crc    = 0xFFFF
    poly   = 0x8408
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 1:
                crc = (crc >> 1) ^ poly
            else:
                crc >>= 1
    return (crc ^ 0xFFFF) == crc_rx

def decode_ax25_addr(addr_bytes):
    callsign = ''.join(chr(b >> 1) for b in addr_bytes[:6]).strip()
    ssid     = (addr_bytes[6] >> 1) & 0x0F
    return callsign, ssid

def decode_ax25_frame(frame_bytes):
    if len(frame_bytes) < 16:
        return None
    dst_call, dst_ssid = decode_ax25_addr(frame_bytes[0:7])
    src_call, src_ssid = decode_ax25_addr(frame_bytes[7:14])
    offset = 14
    digis  = []
    while not (frame_bytes[offset - 1] & 0x01):
        if offset + 7 > len(frame_bytes):
            break
        digi_call, digi_ssid = decode_ax25_addr(frame_bytes[offset:offset+7])
        digis.append(f'{digi_call}-{digi_ssid}' if digi_ssid else digi_call)
        offset += 7
    if offset + 2 > len(frame_bytes):
        return None
    control = frame_bytes[offset]
    pid     = frame_bytes[offset + 1]
    offset += 2
    info_bytes = frame_bytes[offset:-2]
    try:
        info = bytes(info_bytes).decode('ascii', errors='replace').strip()
    except Exception:
        info = repr(bytes(info_bytes))
    return {
        'src':   f'{src_call}-{src_ssid}' if src_ssid else src_call,
        'dst':   f'{dst_call}-{dst_ssid}' if dst_ssid else dst_call,
        'digis': digis,
        'ctrl':  control,
        'pid':   pid,
        'info':  info,
    }

def parse_aprs_position(info, has_timestamp=False):
    try:
        offset   = 8 if has_timestamp else 1
        lat_str  = info[offset:offset+8]
        lat_deg  = float(lat_str[:2])
        lat_min  = float(lat_str[2:7])
        lat_hemi = lat_str[7]
        latitude = lat_deg + lat_min / 60.0
        if lat_hemi == 'S':
            latitude = -latitude
        sym_table = info[offset+8]
        lon_str   = info[offset+9:offset+18]
        lon_deg   = float(lon_str[:3])
        lon_min   = float(lon_str[3:8])
        lon_hemi  = lon_str[8]
        longitude = lon_deg + lon_min / 60.0
        if lon_hemi == 'W':
            longitude = -longitude
        sym_code = info[offset+18] if len(info) > offset+18 else ''
        comment  = info[offset+19:] if len(info) > offset+19 else ''
        return {
            'type':      'position',
            'latitude':  round(latitude,  5),
            'longitude': round(longitude, 5),
            'sym_table': sym_table,
            'sym_code':  sym_code,
            'comment':   comment,
        }
    except Exception:
        return {'type': 'position_raw', 'raw': info}

def parse_aprs_info(info):
    if not info:
        return {'type': 'unknown'}
    symbol = info[0]
    if symbol in ('!', '='):
        return parse_aprs_position(info)
    if symbol in ('@', '/'):
        return parse_aprs_position(info, has_timestamp=True)
    if symbol == '>':
        return {'type': 'status', 'text': info[1:]}
    if symbol == ':' and len(info) >= 10:
        return {'type': 'message', 'to': info[1:10].strip(), 'text': info[11:]}
    if symbol in ('`', "'"):
        return {'type': 'mice', 'raw': info}
    if symbol == ';':
        return {'type': 'object', 'raw': info[1:]}
    if symbol == '_':
        return {'type': 'weather', 'raw': info[1:]}
    return {'type': 'unknown', 'raw': info}

def extract_frames(bits):
    flag   = [0,1,1,1,1,1,1,0]
    frames = []
    n      = len(bits)
    i      = 0
    while i < n - 8:
        if bits[i:i+8] == flag:
            start = i + 8
            j     = start
            while j < n - 8:
                if bits[j:j+8] == flag:
                    frame_bits = bits[start:j]
                    if len(frame_bits) >= 16*8 and len(frame_bits) % 8 == 0:
                        frame_bytes = bits_to_bytes(frame_bits)
                        if verify_crc(frame_bytes):
                            frames.append(frame_bytes)
                    i = j
                    break
                j += 1
            else:
                i += 1
                continue
        i += 1
    return frames

def print_packet(frame, aprs, timestamp):
    print('\n' + '='*60)
    print(f'  {timestamp}')
    print(f'  {frame["src"]} -> {frame["dst"]}', end='')
    if frame['digis']:
        print(f'  via {", ".join(frame["digis"])}', end='')
    print()
    print(f'  Raw info: {frame["info"]}')
    t = aprs.get('type', 'unknown')
    if t == 'position':
        print(f'  Position:  {aprs["latitude"]:.5f}, {aprs["longitude"]:.5f}')
        print(f'  Comment:   {aprs["comment"]}')
    elif t == 'status':
        print(f'  Status:    {aprs["text"]}')
    elif t == 'message':
        print(f'  Message to {aprs["to"]}: {aprs["text"]}')
    elif t == 'weather':
        print(f'  Weather:   {aprs["raw"]}')
    elif t == 'object':
        print(f'  Object:    {aprs["raw"]}')
    else:
        print(f'  Type:      {t}')
    print('='*60)

def main():
    print('=== AFSK APRS Decoder - ADALM PlutoSDR ===')
    print(f'    Frequency:   {FC_RX/1e6:.3f} MHz')
    print(f'    Sample rate: {FS_PLUTO/1e3:.0f} kHz')
    print(f'    Baud rate:   {BAUD_RATE} bps')
    print(f'    Tones:       {MARK_FREQ}/{SPACE_FREQ} Hz\n')

    sdr = adi.Pluto('ip:192.168.2.1')
    sdr.sample_rate                 = int(FS_PLUTO)
    sdr.rx_lo                       = int(FC_RX)
    sdr.rx_rf_bandwidth             = 200_000
    sdr.gain_control_mode_chan0     = 'slow_attack'
    sdr.rx_buffer_size              = RX_BUFFER_SIZE

    print(f'Connected')
    print(f'  RX LO:        {sdr.rx_lo/1e6:.3f} MHz')
    print(f'  Sample rate:  {sdr.sample_rate} Hz')
    print(f'\nListening for APRS packets... (Ctrl+C to stop)\n')

    packets_decoded = 0

    try:
        while True:
            iq      = sdr.rx()
            iq_down = downsample(iq.astype(np.complex64), FS_PLUTO, FS_AUDIO)
            audio   = fm_demodulate(iq_down)
            audio   = bandpass_filter(audio, 1000, 2400, FS_AUDIO)
            soft_bits    = afsk_demodulate(audio, FS_AUDIO, MARK_FREQ, SPACE_FREQ)
            raw_bits     = recover_bits(soft_bits, FS_AUDIO, BAUD_RATE)
            if len(raw_bits) < 100:
                continue
            decoded_bits   = nrzi_decode(raw_bits)
            destuffed_bits = bit_destuff(decoded_bits)
            frames         = extract_frames(destuffed_bits)
            for frame_bytes in frames:
                ax25 = decode_ax25_frame(frame_bytes)
                if ax25 is None:
                    continue
                aprs      = parse_aprs_info(ax25['info'])
                timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                packets_decoded += 1
                print_packet(ax25, aprs, timestamp)
                print(f'  Total packets decoded: {packets_decoded}')

    except KeyboardInterrupt:
        print(f'\n\nStopped. Total packets decoded: {packets_decoded}')

    finally:
        del sdr

if __name__ == '__main__':
    main()

Expected Decoder Output

=== AFSK APRS Decoder – ADALM PlutoSDR ===
    Frequency:   144.800 MHz
    Sample rate: 576 kHz
    Baud rate:   1200 bps
    Tones:       1200/2200 Hz

Listening for APRS packets… (Ctrl+C to stop)

============================================================
  2024-03-24 14:32:07
  NOCALL -> APRS
  Raw info: !4807.38N/01131.00E>Test APRS via PlutoSDR
  Position:  48.12300, 11.51667
  Comment:   Test APRS via PlutoSDR
============================================================
  Total packets decoded: 1

Verifying Reception with Other Tools

  • Direwolf (macOS/Linux) — install with brew install direwolf and pipe audio into it for software TNC decoding
  • APRSDroid (Android) — connect via Bluetooth TNC or microphone
  • PocketPacket (iOS) — listens via the device microphone
  • aprs.fi — if your station has internet access, packets appear on the map within seconds

Key Lessons Learned

  • Always use a data-capable USB cable. Charge-only cables are the most common reason the PlutoSDR network interface shows as inactive on macOS.
  • The /dev/tty.usbmodem port is not used for SDR data. Both MATLAB and Python communicate over IP at 192.168.2.1.
  • The AD9363 PLL rounds frequencies slightly. A requested 576,000 Hz may read back as 575,999 Hz. This is normal and does not affect APRS decoding.
  • In pyadi-iio cyclic TX mode, call sdr.tx() only once. The hardware loops the buffer automatically. A second call raises an exception.
  • Choose a Pluto sample rate that is an exact integer multiple of 48,000 Hz. This gives a clean resampling ratio with no fractional artifacts. 576,000 Hz (12x) is the smallest such value above the 521,000 Hz pyadi-iio minimum.
  • MATLAB’s bitwise functions require all arguments to share the same integer type. Never mix uint8, uint16, or double literals in the same bitand or bitor call.
Categories: Ham Radio, Programming, Python Tags: , ,

Transmitting and Decoding APRS Packets with ADALM PlutoSDR on Raspberry Pi using Python

This post is a Raspberry Pi 5 edition of our AFSK APRS transmitter and decoder project using the ADALM PlutoSDR. We cover everything from scratch: setting up the operating system, installing all dependencies including libiio, SoapySDR and pyadi-iio, connecting the PlutoSDR over USB, and running the full Python transmitter and decoder pipeline. What is APRS? […]

Comments are closed.