Module meshtastic.tests.test_node

Meshtastic unit tests for node.py

Expand source code
"""Meshtastic unit tests for node.py"""

import re
import logging

from unittest.mock import patch, MagicMock
import pytest

from ..node import Node
from ..serial_interface import SerialInterface
from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig
#from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2,
#                                  CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4,
#                                  CannedMessagePluginMessagePart5)
from ..util import Timeout


@pytest.mark.unit
def test_node(capsys):
    """Test that we can instantiate a Node"""
    anode = Node('foo', 'bar')
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig
    anode.showChannels()
    anode.showInfo()
    out, err = capsys.readouterr()
    assert re.search(r'Preferences', out)
    assert re.search(r'Channels', out)
    assert re.search(r'Primary channel URL', out)
    assert err == ''


@pytest.mark.unit
def test_node_requestConfig(capsys):
    """Test run requestConfig"""
    iface = MagicMock(autospec=SerialInterface)
    amesg = MagicMock(autospec=AdminMessage)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
            anode = Node(mo, 'bar')
            anode.requestConfig()
    out, err = capsys.readouterr()
    assert re.search(r'Requesting preferences from remote node', out, re.MULTILINE)
    assert err == ''


#@pytest.mark.unit
#def test_node_get_canned_message_with_all_parts(capsys):
#    """Test run get_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#            # we have a sleep in this method, so override it so it goes fast
#            with patch('time.sleep'):
#                anode = Node(mo, 'bar')
#                anode.cannedPluginMessagePart1 = 'a'
#                anode.cannedPluginMessagePart2 = 'b'
#                anode.cannedPluginMessagePart3 = 'c'
#                anode.cannedPluginMessagePart4 = 'd'
#                anode.cannedPluginMessagePart5 = 'e'
#                anode.get_canned_message()
#    out, err = capsys.readouterr()
#    assert re.search(r'canned_plugin_message:abcde', out, re.MULTILINE)
#    assert err == ''
#
#
#@pytest.mark.unit
#def test_node_get_canned_message_with_some_parts(capsys):
#    """Test run get_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#            # we have a sleep in this method, so override it so it goes fast
#            with patch('time.sleep'):
#                anode = Node(mo, 'bar')
#                anode.cannedPluginMessagePart1 = 'a'
#                anode.get_canned_message()
#    out, err = capsys.readouterr()
#    assert re.search(r'canned_plugin_message:a', out, re.MULTILINE)
#    assert err == ''
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_one_part(caplog):
#    """Test run set_canned_message()"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                anode.set_canned_message('foo')
#    assert re.search(r"Setting canned message 'foo' part 1", caplog.text, re.MULTILINE)
#    assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_200(caplog):
#    """Test run set_canned_message() 200 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_200_chars_long = 'a' * 200
#                anode.set_canned_message(message_200_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_201(caplog):
#    """Test run set_canned_message() 201 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_201_chars_long = 'a' * 201
#                anode.set_canned_message(message_201_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert re.search(r"Setting canned message 'a' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_1000(caplog):
#    """Test run set_canned_message() 1000 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    amesg = MagicMock(autospec=AdminMessage)
#    with caplog.at_level(logging.DEBUG):
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
#                anode = Node(mo, 'bar')
#                message_1000_chars_long = 'a' * 1000
#                anode.set_canned_message(message_1000_chars_long)
#    assert re.search(r" part 1", caplog.text, re.MULTILINE)
#    assert re.search(r" part 2", caplog.text, re.MULTILINE)
#    assert re.search(r" part 3", caplog.text, re.MULTILINE)
#    assert re.search(r" part 4", caplog.text, re.MULTILINE)
#    assert re.search(r" part 5", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_1001(capsys):
#    """Test run set_canned_message() 1001 characters long"""
#    iface = MagicMock(autospec=SerialInterface)
#    with pytest.raises(SystemExit) as pytest_wrapped_e:
#        with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#            anode = Node(mo, 'bar')
#            message_1001_chars_long = 'a' * 1001
#            anode.set_canned_message(message_1001_chars_long)
#    assert pytest_wrapped_e.type == SystemExit
#    assert pytest_wrapped_e.value.code == 1
#    out, err = capsys.readouterr()
#    assert re.search(r'Warning: The canned message', out, re.MULTILINE)
#    assert err == ''


@pytest.mark.unit
def test_setOwner_and_team(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Test123', short_name='123', team=1)
    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:1', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwnerShort(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name=None, short_name='123')
    assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwner_no_short_name(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Test123')
    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Tst:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_is_short(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Tnt')
    assert re.search(r'p.set_owner.long_name:Tnt:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Tnt:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_has_words(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='A B C', is_licensed=True)
    assert re.search(r'p.set_owner.long_name:A B C:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwner_long_name_no_short(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Aabo', is_licensed=True)
    assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_exitSimulator(caplog):
    """Test exitSimulator"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.exitSimulator()
    assert re.search(r'in exitSimulator', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_reboot(caplog):
    """Test reboot"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.reboot()
    assert re.search(r'Telling node to reboot', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_shutdown(caplog):
    """Test shutdown"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.shutdown()
    assert re.search(r'Telling node to shutdown', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setURL_empty_url(capsys):
    """Test reboot"""
    anode = Node('foo', 'bar', noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.setURL('')
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Warning: No RadioConfig has been read', out, re.MULTILINE)
    assert err == ''


@pytest.mark.unit
def test_setURL_valid_URL(caplog):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
    with caplog.at_level(logging.DEBUG):
        anode = Node(iface, 'bar', noProto=True)
        anode.radioConfig = 'baz'
        channels = ['zoo']
        anode.channels = channels
        anode.setURL(url)
    assert re.search(r'Channel i:0', caplog.text, re.MULTILINE)
    assert re.search(r'modem_config: MidSlow', caplog.text, re.MULTILINE)
    assert re.search(r'psk: "\\001"', caplog.text, re.MULTILINE)
    assert re.search(r'role: PRIMARY', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(capsys):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#"
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode = Node(iface, 'bar', noProto=True)
        anode.radioConfig = 'baz'
        anode.setURL(url)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Warning: There were no settings', out, re.MULTILINE)
    assert err == ''


@pytest.mark.unit
def test_showChannels(capsys):
    """Test showChannels"""
    anode = Node('foo', 'bar')

    # primary channel
    # role: 0=Disabled, 1=Primary, 2=Secondary
    # modem_config: 0-5
    # role: 0=Disabled, 1=Primary, 2=Secondary
    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    anode.showChannels()
    out, err = capsys.readouterr()
    assert re.search(r'Channels:', out, re.MULTILINE)
    # primary channel
    assert re.search(r'Primary channel URL', out, re.MULTILINE)
    assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE)
    assert re.search(r'"modemConfig": "MidSlow"', out, re.MULTILINE)
    assert re.search(r'"psk": "AQ=="', out, re.MULTILINE)
    # secondary channel
    assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE)
    assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE)
    assert err == ''


@pytest.mark.unit
def test_getChannelByChannelIndex():
    """Test getChannelByChannelIndex()"""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1) # primary channel
    channel2 = Channel(index=2, role=2) # secondary channel
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels

    # test primary
    assert anode.getChannelByChannelIndex(0) is not None
    # test secondary
    assert anode.getChannelByChannelIndex(1) is not None
    # test disabled
    assert anode.getChannelByChannelIndex(2) is not None
    # test invalid values
    assert anode.getChannelByChannelIndex(-1) is None
    assert anode.getChannelByChannelIndex(9) is None


@pytest.mark.unit
def test_deleteChannel_try_to_delete_primary_channel(capsys):
    """Try to delete primary channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    # no secondary channels
    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.deleteChannel(0)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE)
    assert err == ''

@pytest.mark.unit
def test_deleteChannel_secondary():
    """Try to delete a secondary channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'testing'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(1)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == ''
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''


@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_after_testing():
    """Try to delete a secondary channel where there is an admin channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=2)
    channel3.settings.name = 'admin'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        assert mo.localNode == anode

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'testing'
        assert channels[2].settings.name == 'admin'
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(1)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''


@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_before_testing():
    """Try to delete a secondary channel where there is an admin channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=2)
    channel3.settings.name = 'testing'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == 'testing'
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(2)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''


@pytest.mark.unit
def test_getChannelByName():
    """Get a channel by the name."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getChannelByName('admin')
    assert ch.index == 2


@pytest.mark.unit
def test_getChannelByName_invalid_name():
    """Get a channel by the name but one that is not present."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getChannelByName('testing')
    assert ch is None


@pytest.mark.unit
def test_getDisabledChannel():
    """Get the first disabled channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testingA'

    channel3 = Channel(index=3, role=2)
    channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel3.settings.name = 'testingB'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getDisabledChannel()
    assert ch.index == 4


@pytest.mark.unit
def test_getDisabledChannel_where_all_channels_are_used():
    """Get the first disabled channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel3 = Channel(index=3, role=2)
    channel4 = Channel(index=4, role=2)
    channel5 = Channel(index=5, role=2)
    channel6 = Channel(index=6, role=2)
    channel7 = Channel(index=7, role=2)
    channel8 = Channel(index=8, role=2)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getDisabledChannel()
    assert ch is None


@pytest.mark.unit
def test_getAdminChannelIndex():
    """Get the 'admin' channel index."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    i = anode._getAdminChannelIndex()
    assert i == 2


@pytest.mark.unit
def test_getAdminChannelIndex_when_no_admin_named_channel():
    """Get the 'admin' channel when there is not one."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    i = anode._getAdminChannelIndex()
    assert i == 0


# TODO: should we check if we need to turn it off?
@pytest.mark.unit
def test_turnOffEncryptionOnPrimaryChannel(capsys):
    """Turn off encryption when there is a psk."""
    anode = Node('foo', 'bar', noProto=True)

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    # value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b "
    channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++'

    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    anode.turnOffEncryptionOnPrimaryChannel()
    out, err = capsys.readouterr()
    assert re.search(r'Writing modified channels to device', out)
    assert err == ''


@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
    """Test writeConfig with no radioConfig."""
    anode = Node('foo', 'bar', noProto=True)

    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.writeConfig()
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Error: No RadioConfig has been read', out)
    assert err == ''


@pytest.mark.unit
def test_writeConfig(caplog):
    """Test writeConfig"""
    anode = Node('foo', 'bar', noProto=True)
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig

    with caplog.at_level(logging.DEBUG):
        anode.writeConfig()
    assert re.search(r'Wrote config', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_requestChannel_not_localNode(caplog, capsys):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)
        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r'Requesting channel 0 info from remote node', caplog.text, re.MULTILINE)
        out, err = capsys.readouterr()
        assert re.search(r'Requesting channel 0 info', out, re.MULTILINE)
        assert err == ''


@pytest.mark.unit
def test_requestChannel_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r'Requesting channel 0', caplog.text, re.MULTILINE)
            assert not re.search(r'from remote node', caplog.text, re.MULTILINE)


#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart1(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart1()"""
#
#    part1 = CannedMessagePluginMessagePart1()
#    part1.text = 'foo1'
#
#    msg1 = MagicMock(autospec=AdminMessage)
#    msg1.get_canned_message_plugin_part1_response = part1
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart1Response': {'text': 'foo1'},
#                    'raw': msg1
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart1 == 'foo1'


#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart2(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart2()"""
#
#    part2 = CannedMessagePluginMessagePart2()
#    part2.text = 'foo2'
#
#    msg2 = MagicMock(autospec=AdminMessage)
#    msg2.get_canned_message_plugin_part2_response = part2
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart2Response': {'text': 'foo2'},
#                    'raw': msg2
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart2 == 'foo2'


#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart3(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart3()"""
#
#    part3 = CannedMessagePluginMessagePart3()
#    part3.text = 'foo3'
#
#    msg3 = MagicMock(autospec=AdminMessage)
#    msg3.get_canned_message_plugin_part3_response = part3
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart3Response': {'text': 'foo3'},
#                    'raw': msg3
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart3 == 'foo3'


#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart4(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart4()"""
#
#    part4 = CannedMessagePluginMessagePart4()
#    part4.text = 'foo4'
#
#    msg4 = MagicMock(autospec=AdminMessage)
#    msg4.get_canned_message_plugin_part4_response = part4
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart4Response': {'text': 'foo4'},
#                    'raw': msg4
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart4 == 'foo4'


#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart5(caplog):
#    """Test onResponseRequestCannedMessagePluginMessagePart5()"""
#
#    part5 = CannedMessagePluginMessagePart5()
#    part5.text = 'foo5'
#
#    msg5 = MagicMock(autospec=AdminMessage)
#    msg5.get_canned_message_plugin_part5_response = part5
#
#
#    packet = {
#            'from': 682968612,
#            'to': 682968612,
#            'decoded': {
#                'portnum': 'ADMIN_APP',
#                'payload': 'faked',
#                'requestId': 927039000,
#                'admin': {
#                    'getCannedMessagePluginPart5Response': {'text': 'foo5'},
#                    'raw': msg5
#                    }
#                },
#            'id': 589440320,
#            'rxTime': 1642710843,
#            'hopLimit': 3,
#            'priority': 'RELIABLE',
#            'raw': 'faked',
#            'fromId': '!28b54624',
#            'toId': '!28b54624'
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
#            assert anode.cannedPluginMessagePart5 == 'foo5'


#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart1_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart1() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart2_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart2() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart3_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart3() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''
#
#
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart4_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart4() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''
#
#
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart5_error(caplog, capsys):
#    """Test onResponseRequestCannedMessagePluginMessagePart5() with error"""
#
#    packet = {
#            'decoded': {
#                'routing': {
#                    'errorReason': 'some made up error',
#                    },
#                },
#            }
#
#    iface = MagicMock(autospec=SerialInterface)
#    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
#        anode = Node(mo, 'bar', noProto=True)
#        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
#        mo.localNode = anode
#
#        with caplog.at_level(logging.DEBUG):
#            anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
#            assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
#        out, err = capsys.readouterr()
#        assert re.search(r'Error on response', out)
#        assert err == ''


@pytest.mark.unit
def test_onResponseRequestChannel(caplog):
    """Test onResponseRequestChannel()"""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    msg1 = MagicMock(autospec=AdminMessage)
    msg1.get_channel_response = channel1

    msg2 = MagicMock(autospec=AdminMessage)
    channel2 = Channel(index=2, role=0) # disabled
    msg2.get_channel_response = channel2

    # default primary channel
    packet1 = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
            'requestId': 2615094405,
            'admin': {
                'getChannelResponse': {
                    'settings': {
                        'modemConfig': 'Bw125Cr48Sf4096',
                        'psk': 'AQ=='
                    },
                    'role': 'PRIMARY'
                },
                'raw': msg1,
            }
        },
        'id': 1692918436,
        'hopLimit': 3,
        'priority':
        'RELIABLE',
        'raw': 'fake',
        'fromId': '!9388f81c',
        'toId': '!9388f81c'
        }

    # no other channels
    packet2 = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b':\x04\x08\x02\x12\x00',
            'requestId': 743049663,
            'admin': {
                'getChannelResponse': {
                    'index': 2,
                    'settings': {}
                },
                'raw': msg2,
            }
        },
        'id': 1692918456,
        'rxTime': 1640202239,
        'hopLimit': 3,
        'priority': 'RELIABLE',
        'raw': 'faked',
        'fromId': '!9388f81c',
        'toId': '!9388f81c'
    }

    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode.requestConfig()
            anode.onResponseRequestChannel(packet1)
            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
            anode.onResponseRequestChannel(packet2)
            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
            assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
            assert len(anode.channels) == 8
            assert anode.channels[0].settings.modem_config == 3
            assert anode.channels[1].settings.name == ''
            assert anode.channels[2].settings.name == ''
            assert anode.channels[3].settings.name == ''
            assert anode.channels[4].settings.name == ''
            assert anode.channels[5].settings.name == ''
            assert anode.channels[6].settings.name == ''
            assert anode.channels[7].settings.name == ''


@pytest.mark.unit
def test_onResponseRequestSetting(caplog):
    """Test onResponseRequestSetting()"""
    # Note: Split out the get_radio_response to a MagicMock
    # so it could be "returned" (not really sure how to do that
    # in a python dict.
    amsg = MagicMock(autospec=AdminMessage)
    amsg.get_radio_response = """{
  preferences {
    phone_timeout_secs: 900
    ls_secs: 300
    position_broadcast_smart: true
    position_flags: 35
  }
}"""
    packet = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
            'requestId': 3145147848,
            'admin': {
                'getRadioResponse': {
                    'preferences': {
                        'phoneTimeoutSecs': 900,
                        'lsSecs': 300,
                        'positionBroadcastSmart': True,
                        'positionFlags': 35
                     }
                },
                'raw': amsg
            },
            'id': 365963704,
            'rxTime': 1640195197,
            'hopLimit': 3,
            'priority': 'RELIABLE',
            'raw': 'faked',
            'fromId': '!9388f81c',
            'toId': '!9388f81c'
        }
    }
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode.onResponseRequestSettings(packet)
            assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_onResponseRequestSetting_with_error(capsys):
    """Test onResponseRequestSetting() with an error"""
    packet = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
            'requestId': 3145147848,
            'routing': {
                'errorReason': 'some made up error',
            },
            'admin': {
                'getRadioResponse': {
                    'preferences': {
                        'phoneTimeoutSecs': 900,
                        'lsSecs': 300,
                        'positionBroadcastSmart': True,
                        'positionFlags': 35
                     }
                },
            },
            'id': 365963704,
            'rxTime': 1640195197,
            'hopLimit': 3,
            'priority': 'RELIABLE',
            'fromId': '!9388f81c',
            'toId': '!9388f81c'
        }
    }
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        anode.onResponseRequestSettings(packet)
        out, err = capsys.readouterr()
        assert re.search(r'Error on response', out)
        assert err == ''


@pytest.mark.unitslow
def test_waitForConfig():
    """Test waitForConfig()"""
    anode = Node('foo', 'bar')
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig
    anode._timeout = Timeout(0.01)
    result = anode.waitForConfig()
    assert not result

Functions

def test_deleteChannel_secondary()

Try to delete a secondary channel.

Expand source code
@pytest.mark.unit
def test_deleteChannel_secondary():
    """Try to delete a secondary channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'testing'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(1)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == ''
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''
def test_deleteChannel_secondary_with_admin_channel_after_testing()

Try to delete a secondary channel where there is an admin channel.

Expand source code
@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_after_testing():
    """Try to delete a secondary channel where there is an admin channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=2)
    channel3.settings.name = 'admin'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        assert mo.localNode == anode

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'testing'
        assert channels[2].settings.name == 'admin'
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(1)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''
def test_deleteChannel_secondary_with_admin_channel_before_testing()

Try to delete a secondary channel where there is an admin channel.

Expand source code
@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_before_testing():
    """Try to delete a secondary channel where there is an admin channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=2)
    channel3.settings.name = 'testing'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == 'testing'
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(2)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''
def test_deleteChannel_try_to_delete_primary_channel(capsys)

Try to delete primary channel.

Expand source code
@pytest.mark.unit
def test_deleteChannel_try_to_delete_primary_channel(capsys):
    """Try to delete primary channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    # no secondary channels
    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.deleteChannel(0)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE)
    assert err == ''
def test_exitSimulator(caplog)

Test exitSimulator

Expand source code
@pytest.mark.unit
def test_exitSimulator(caplog):
    """Test exitSimulator"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.exitSimulator()
    assert re.search(r'in exitSimulator', caplog.text, re.MULTILINE)
def test_getAdminChannelIndex()

Get the 'admin' channel index.

Expand source code
@pytest.mark.unit
def test_getAdminChannelIndex():
    """Get the 'admin' channel index."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    i = anode._getAdminChannelIndex()
    assert i == 2
def test_getAdminChannelIndex_when_no_admin_named_channel()

Get the 'admin' channel when there is not one.

Expand source code
@pytest.mark.unit
def test_getAdminChannelIndex_when_no_admin_named_channel():
    """Get the 'admin' channel when there is not one."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    i = anode._getAdminChannelIndex()
    assert i == 0
def test_getChannelByChannelIndex()

Test getChannelByChannelIndex()

Expand source code
@pytest.mark.unit
def test_getChannelByChannelIndex():
    """Test getChannelByChannelIndex()"""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1) # primary channel
    channel2 = Channel(index=2, role=2) # secondary channel
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels

    # test primary
    assert anode.getChannelByChannelIndex(0) is not None
    # test secondary
    assert anode.getChannelByChannelIndex(1) is not None
    # test disabled
    assert anode.getChannelByChannelIndex(2) is not None
    # test invalid values
    assert anode.getChannelByChannelIndex(-1) is None
    assert anode.getChannelByChannelIndex(9) is None
def test_getChannelByName()

Get a channel by the name.

Expand source code
@pytest.mark.unit
def test_getChannelByName():
    """Get a channel by the name."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getChannelByName('admin')
    assert ch.index == 2
def test_getChannelByName_invalid_name()

Get a channel by the name but one that is not present.

Expand source code
@pytest.mark.unit
def test_getChannelByName_invalid_name():
    """Get a channel by the name but one that is not present."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getChannelByName('testing')
    assert ch is None
def test_getDisabledChannel()

Get the first disabled channel.

Expand source code
@pytest.mark.unit
def test_getDisabledChannel():
    """Get the first disabled channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testingA'

    channel3 = Channel(index=3, role=2)
    channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel3.settings.name = 'testingB'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getDisabledChannel()
    assert ch.index == 4
def test_getDisabledChannel_where_all_channels_are_used()

Get the first disabled channel.

Expand source code
@pytest.mark.unit
def test_getDisabledChannel_where_all_channels_are_used():
    """Get the first disabled channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel3 = Channel(index=3, role=2)
    channel4 = Channel(index=4, role=2)
    channel5 = Channel(index=5, role=2)
    channel6 = Channel(index=6, role=2)
    channel7 = Channel(index=7, role=2)
    channel8 = Channel(index=8, role=2)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getDisabledChannel()
    assert ch is None
def test_node(capsys)

Test that we can instantiate a Node

Expand source code
@pytest.mark.unit
def test_node(capsys):
    """Test that we can instantiate a Node"""
    anode = Node('foo', 'bar')
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig
    anode.showChannels()
    anode.showInfo()
    out, err = capsys.readouterr()
    assert re.search(r'Preferences', out)
    assert re.search(r'Channels', out)
    assert re.search(r'Primary channel URL', out)
    assert err == ''
def test_node_requestConfig(capsys)

Test run requestConfig

Expand source code
@pytest.mark.unit
def test_node_requestConfig(capsys):
    """Test run requestConfig"""
    iface = MagicMock(autospec=SerialInterface)
    amesg = MagicMock(autospec=AdminMessage)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
            anode = Node(mo, 'bar')
            anode.requestConfig()
    out, err = capsys.readouterr()
    assert re.search(r'Requesting preferences from remote node', out, re.MULTILINE)
    assert err == ''
def test_onResponseRequestChannel(caplog)

Test onResponseRequestChannel()

Expand source code
@pytest.mark.unit
def test_onResponseRequestChannel(caplog):
    """Test onResponseRequestChannel()"""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    msg1 = MagicMock(autospec=AdminMessage)
    msg1.get_channel_response = channel1

    msg2 = MagicMock(autospec=AdminMessage)
    channel2 = Channel(index=2, role=0) # disabled
    msg2.get_channel_response = channel2

    # default primary channel
    packet1 = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
            'requestId': 2615094405,
            'admin': {
                'getChannelResponse': {
                    'settings': {
                        'modemConfig': 'Bw125Cr48Sf4096',
                        'psk': 'AQ=='
                    },
                    'role': 'PRIMARY'
                },
                'raw': msg1,
            }
        },
        'id': 1692918436,
        'hopLimit': 3,
        'priority':
        'RELIABLE',
        'raw': 'fake',
        'fromId': '!9388f81c',
        'toId': '!9388f81c'
        }

    # no other channels
    packet2 = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b':\x04\x08\x02\x12\x00',
            'requestId': 743049663,
            'admin': {
                'getChannelResponse': {
                    'index': 2,
                    'settings': {}
                },
                'raw': msg2,
            }
        },
        'id': 1692918456,
        'rxTime': 1640202239,
        'hopLimit': 3,
        'priority': 'RELIABLE',
        'raw': 'faked',
        'fromId': '!9388f81c',
        'toId': '!9388f81c'
    }

    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode.requestConfig()
            anode.onResponseRequestChannel(packet1)
            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
            anode.onResponseRequestChannel(packet2)
            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
            assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
            assert len(anode.channels) == 8
            assert anode.channels[0].settings.modem_config == 3
            assert anode.channels[1].settings.name == ''
            assert anode.channels[2].settings.name == ''
            assert anode.channels[3].settings.name == ''
            assert anode.channels[4].settings.name == ''
            assert anode.channels[5].settings.name == ''
            assert anode.channels[6].settings.name == ''
            assert anode.channels[7].settings.name == ''
def test_onResponseRequestSetting(caplog)

Test onResponseRequestSetting()

Expand source code
@pytest.mark.unit
def test_onResponseRequestSetting(caplog):
    """Test onResponseRequestSetting()"""
    # Note: Split out the get_radio_response to a MagicMock
    # so it could be "returned" (not really sure how to do that
    # in a python dict.
    amsg = MagicMock(autospec=AdminMessage)
    amsg.get_radio_response = """{
  preferences {
    phone_timeout_secs: 900
    ls_secs: 300
    position_broadcast_smart: true
    position_flags: 35
  }
}"""
    packet = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
            'requestId': 3145147848,
            'admin': {
                'getRadioResponse': {
                    'preferences': {
                        'phoneTimeoutSecs': 900,
                        'lsSecs': 300,
                        'positionBroadcastSmart': True,
                        'positionFlags': 35
                     }
                },
                'raw': amsg
            },
            'id': 365963704,
            'rxTime': 1640195197,
            'hopLimit': 3,
            'priority': 'RELIABLE',
            'raw': 'faked',
            'fromId': '!9388f81c',
            'toId': '!9388f81c'
        }
    }
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode.onResponseRequestSettings(packet)
            assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)
def test_onResponseRequestSetting_with_error(capsys)

Test onResponseRequestSetting() with an error

Expand source code
@pytest.mark.unit
def test_onResponseRequestSetting_with_error(capsys):
    """Test onResponseRequestSetting() with an error"""
    packet = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
            'requestId': 3145147848,
            'routing': {
                'errorReason': 'some made up error',
            },
            'admin': {
                'getRadioResponse': {
                    'preferences': {
                        'phoneTimeoutSecs': 900,
                        'lsSecs': 300,
                        'positionBroadcastSmart': True,
                        'positionFlags': 35
                     }
                },
            },
            'id': 365963704,
            'rxTime': 1640195197,
            'hopLimit': 3,
            'priority': 'RELIABLE',
            'fromId': '!9388f81c',
            'toId': '!9388f81c'
        }
    }
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        anode.onResponseRequestSettings(packet)
        out, err = capsys.readouterr()
        assert re.search(r'Error on response', out)
        assert err == ''
def test_reboot(caplog)

Test reboot

Expand source code
@pytest.mark.unit
def test_reboot(caplog):
    """Test reboot"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.reboot()
    assert re.search(r'Telling node to reboot', caplog.text, re.MULTILINE)
def test_requestChannel_localNode(caplog)

Test _requestChannel()

Expand source code
@pytest.mark.unit
def test_requestChannel_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r'Requesting channel 0', caplog.text, re.MULTILINE)
            assert not re.search(r'from remote node', caplog.text, re.MULTILINE)
def test_requestChannel_not_localNode(caplog, capsys)

Test _requestChannel()

Expand source code
@pytest.mark.unit
def test_requestChannel_not_localNode(caplog, capsys):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)
        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r'Requesting channel 0 info from remote node', caplog.text, re.MULTILINE)
        out, err = capsys.readouterr()
        assert re.search(r'Requesting channel 0 info', out, re.MULTILINE)
        assert err == ''
def test_setOwnerShort(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwnerShort(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name=None, short_name='123')
    assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)
def test_setOwner_and_team(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_and_team(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Test123', short_name='123', team=1)
    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:1', caplog.text, re.MULTILINE)
def test_setOwner_long_name_no_short(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_long_name_no_short(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Aabo', is_licensed=True)
    assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE)
def test_setOwner_no_short_name(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_no_short_name(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Test123')
    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Tst:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
def test_setOwner_no_short_name_and_long_name_has_words(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_has_words(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='A B C', is_licensed=True)
    assert re.search(r'p.set_owner.long_name:A B C:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
def test_setOwner_no_short_name_and_long_name_is_short(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_is_short(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Tnt')
    assert re.search(r'p.set_owner.long_name:Tnt:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Tnt:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
def test_setURL_empty_url(capsys)

Test reboot

Expand source code
@pytest.mark.unit
def test_setURL_empty_url(capsys):
    """Test reboot"""
    anode = Node('foo', 'bar', noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.setURL('')
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Warning: No RadioConfig has been read', out, re.MULTILINE)
    assert err == ''
def test_setURL_valid_URL(caplog)

Test setURL

Expand source code
@pytest.mark.unit
def test_setURL_valid_URL(caplog):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
    with caplog.at_level(logging.DEBUG):
        anode = Node(iface, 'bar', noProto=True)
        anode.radioConfig = 'baz'
        channels = ['zoo']
        anode.channels = channels
        anode.setURL(url)
    assert re.search(r'Channel i:0', caplog.text, re.MULTILINE)
    assert re.search(r'modem_config: MidSlow', caplog.text, re.MULTILINE)
    assert re.search(r'psk: "\\001"', caplog.text, re.MULTILINE)
    assert re.search(r'role: PRIMARY', caplog.text, re.MULTILINE)
def test_setURL_valid_URL_but_no_settings(capsys)

Test setURL

Expand source code
@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(capsys):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#"
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode = Node(iface, 'bar', noProto=True)
        anode.radioConfig = 'baz'
        anode.setURL(url)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Warning: There were no settings', out, re.MULTILINE)
    assert err == ''
def test_showChannels(capsys)

Test showChannels

Expand source code
@pytest.mark.unit
def test_showChannels(capsys):
    """Test showChannels"""
    anode = Node('foo', 'bar')

    # primary channel
    # role: 0=Disabled, 1=Primary, 2=Secondary
    # modem_config: 0-5
    # role: 0=Disabled, 1=Primary, 2=Secondary
    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    anode.showChannels()
    out, err = capsys.readouterr()
    assert re.search(r'Channels:', out, re.MULTILINE)
    # primary channel
    assert re.search(r'Primary channel URL', out, re.MULTILINE)
    assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE)
    assert re.search(r'"modemConfig": "MidSlow"', out, re.MULTILINE)
    assert re.search(r'"psk": "AQ=="', out, re.MULTILINE)
    # secondary channel
    assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE)
    assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE)
    assert err == ''
def test_shutdown(caplog)

Test shutdown

Expand source code
@pytest.mark.unit
def test_shutdown(caplog):
    """Test shutdown"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.shutdown()
    assert re.search(r'Telling node to shutdown', caplog.text, re.MULTILINE)
def test_turnOffEncryptionOnPrimaryChannel(capsys)

Turn off encryption when there is a psk.

Expand source code
@pytest.mark.unit
def test_turnOffEncryptionOnPrimaryChannel(capsys):
    """Turn off encryption when there is a psk."""
    anode = Node('foo', 'bar', noProto=True)

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    # value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b "
    channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++'

    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    anode.turnOffEncryptionOnPrimaryChannel()
    out, err = capsys.readouterr()
    assert re.search(r'Writing modified channels to device', out)
    assert err == ''
def test_waitForConfig()

Test waitForConfig()

Expand source code
@pytest.mark.unitslow
def test_waitForConfig():
    """Test waitForConfig()"""
    anode = Node('foo', 'bar')
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig
    anode._timeout = Timeout(0.01)
    result = anode.waitForConfig()
    assert not result
def test_writeConfig(caplog)

Test writeConfig

Expand source code
@pytest.mark.unit
def test_writeConfig(caplog):
    """Test writeConfig"""
    anode = Node('foo', 'bar', noProto=True)
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig

    with caplog.at_level(logging.DEBUG):
        anode.writeConfig()
    assert re.search(r'Wrote config', caplog.text, re.MULTILINE)
def test_writeConfig_with_no_radioConfig(capsys)

Test writeConfig with no radioConfig.

Expand source code
@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
    """Test writeConfig with no radioConfig."""
    anode = Node('foo', 'bar', noProto=True)

    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.writeConfig()
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Error: No RadioConfig has been read', out)
    assert err == ''

Classes

class AdminMessage (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google.protobuf.pyext._message.CMessage
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR

Instance variables

var confirm_set_channel

Field AdminMessage.confirm_set_channel

var confirm_set_radio

Field AdminMessage.confirm_set_radio

var exit_simulator

Field AdminMessage.exit_simulator

var get_canned_message_module_part1_request

Field AdminMessage.get_canned_message_module_part1_request

var get_canned_message_module_part1_response

Field AdminMessage.get_canned_message_module_part1_response

var get_canned_message_module_part2_request

Field AdminMessage.get_canned_message_module_part2_request

var get_canned_message_module_part2_response

Field AdminMessage.get_canned_message_module_part2_response

var get_canned_message_module_part3_request

Field AdminMessage.get_canned_message_module_part3_request

var get_canned_message_module_part3_response

Field AdminMessage.get_canned_message_module_part3_response

var get_canned_message_module_part4_request

Field AdminMessage.get_canned_message_module_part4_request

var get_canned_message_module_part4_response

Field AdminMessage.get_canned_message_module_part4_response

var get_channel_request

Field AdminMessage.get_channel_request

var get_channel_response

Field AdminMessage.get_channel_response

var get_owner_request

Field AdminMessage.get_owner_request

var get_owner_response

Field AdminMessage.get_owner_response

var get_radio_request

Field AdminMessage.get_radio_request

var get_radio_response

Field AdminMessage.get_radio_response

var reboot_seconds

Field AdminMessage.reboot_seconds

var set_canned_message_module_part1

Field AdminMessage.set_canned_message_module_part1

var set_canned_message_module_part2

Field AdminMessage.set_canned_message_module_part2

var set_canned_message_module_part3

Field AdminMessage.set_canned_message_module_part3

var set_canned_message_module_part4

Field AdminMessage.set_canned_message_module_part4

var set_channel

Field AdminMessage.set_channel

var set_owner

Field AdminMessage.set_owner

var set_radio

Field AdminMessage.set_radio

var shutdown_seconds

Field AdminMessage.shutdown_seconds

class Channel (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google.protobuf.pyext._message.CMessage
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
var DISABLED
var PRIMARY
var Role
var SECONDARY

Instance variables

var index

Field Channel.index

var role

Field Channel.role

var settings

Field Channel.settings

class RadioConfig (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google.protobuf.pyext._message.CMessage
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
var UserPreferences

A ProtocolMessage

Instance variables

var preferences

Field RadioConfig.preferences