21. 03. 2025 Andrea Mariani NetEye, Unified Monitoring

How to Create a Serial Modem Emulation Service on NetEye

In some test or development environments, you may need to simulate the presence of GSM modems without having an actual physical device. This can be useful for example when testing monitoring checks, SMS management systems, or creating new notification rules.

In this post I’ll show you how I created a serial modem emulation service. The service will allow you to generate one or more virtual modems (as needed), respond to AT commands, and simulate state changes for each modem via a direct connection via TCP. This will all be done through a systemd service.

Creating the Emulation Script

The core of the service is a Python script that handles the creation of virtual modems and their interaction with AT commands. So we create a new directory called “serial_modem_emulator” in the path “/neteye/shared/monitoring/plugins/” and inside that we create a file that I named “serial_modem_emulator.py”.

# mkdir /neteye/shared/monitoring/plugins/serial_modem_emulator
# cd /neteye/shared/monitoring/plugins/serial_modem_emulator
# vim serial_modem_emulator.py

We now enter the following code:

import os
import sys
import pty
import json
import threading
import socket

# Directory dei symlink
DEV_PREFIX = "/dev/ttys"

# Risposte di default per i comandi AT
DEFAULT_RESPONSES = {
    "AT": "OK\r\n",
    "ATI": "+CGS: 1.12.0\r\nOK\r\n",
    "AT+CSQ": "+CSQ: 20,99\r\nOK\r\n",
    "AT+CREG?": "+CREG: 1,1\r\nOK\r\n",
    "AT+CGATT?": "+CGATT: 1\r\nOK\r\n",
    "AT+CMEE": "OK\r\n",
    "AT+CNMI": "OK\r\n",
    "AT+COPS?": "+COPS: 0,0,\"Operator Name\",2\r\nOK\r\n",
    "AT+CSQ?": "+CSQ: 20,99\r\nOK\r\n",
    "AT+CPIN?": "+CPIN: READY\r\nOK\r\n",
    "AT+CMGF=0": "OK\r\n",  # Imposta modalità PDU per SMS
    "AT+CMGF=1": "OK\r\n",  # Imposta modalità testo per SMS
    "ATE0": "OK\r\n",  # Disabilita l'eco
    "AT+CMEE=1": "OK\r\n",  # Abilita messaggi di errore dettagliati
    "AT+CREG=2": "OK\r\n",  # Notifica della registrazione alla rete
    "AT+CMGL=\"ALL\"": "OK\r\n",
    "AT+CMGR=1": "+CMGR: \"REC UNREAD\",\"+1234567890\",\"\",\"23/03/06,17:15:00\"\r\nMessaggio di test\r\nOK\r\n",
    "AT+CMGS=\"+1234567890\"": "> ",
    "ATE0+CMEE=1;+CREG=2": "OK\r\n",  # Risponde a tutti e tre i comandi in un'unica stringa
    "AT+CIMI": "123456789012345\r\nOK\r\n",  # Identificazione IMSI
    "AT+CGSN": "123456789012345\r\nOK\r\n",  # Numero di serie (IMEI)
    "AT+CPMS?": "+CPMS: \"SM\",0,30,\"SM\",0,30,\"SM\",0,30\r\nOK\r\n"  # Stato della memoria SMS
}

# Stato simulato per ogni modem
simulation_state = {}

# Funzione per inizializzare lo stato di tutti i modem con risposte predefinite
def initialize_simulation_state(modem_count):
    global simulation_state
    for i in range(modem_count):
        simulation_state[i] = DEFAULT_RESPONSES.copy()  # Copia delle risposte di default per ogni modem

# Gestione comandi AT per modem specifico
def handle_at_command(command, modem_index):
    command = command.strip()

    # Controlla lo stato simulato per il comando richiesto
    if command in simulation_state[modem_index]:
        return simulation_state[modem_index][command]

    # Comando non riconosciuto
    return "ERROR\r\n"

# Creazione dei device virtuali
def create_serial_device(index):
    try:
        master, slave = pty.openpty()
        slave_name = os.ttyname(slave)
        symlink_path = f"{DEV_PREFIX}{index}"

        # Cancella eventuale vecchio symlink
        if os.path.exists(symlink_path):
            os.unlink(symlink_path)

        os.symlink(slave_name, symlink_path)
        print(f"Device seriale creato: {symlink_path} -> {slave_name}")

        while True:
            command = os.read(master, 1024).decode()
            response = handle_at_command(command, index)
            os.write(master, response.encode())
    except Exception as e:
        print(f"Errore nella gestione del modem seriale {index}: {e}")

# Server di controllo per simulazione
def control_server():
    host = "127.0.0.1"
    port = 5000

    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((host, port))
    server_socket.listen(1)
    print(f"Server di controllo in ascolto su {host}:{port}")

    while True:
        client_socket, _ = server_socket.accept()
        command = client_socket.recv(1024).decode().strip()

        try:
            # Comando nel formato: "modem_index|set_state|AT_command|new_response"
            if "|" in command:
                parts = command.split("|")
                modem_index = int(parts[0])

                if parts[1] == "set_state":
                    at_command = parts[2]
                    new_response = parts[3]
                    simulation_state[modem_index][at_command] = new_response
                    client_socket.sendall(f"Stato del comando {at_command} per modem {modem_index} impostato su: {new_response}\n".encode())

                elif parts[1] == "reset":
                    simulation_state[modem_index] = DEFAULT_RESPONSES.copy()
                    client_socket.sendall(f"Stato del modem {modem_index} ripristinato ai valori di default.\n".encode())

                else:
                    client_socket.sendall(b"Comando non riconosciuto.\n")
            else:
                client_socket.sendall(b"Formato comando non valido.\n")

        except Exception as e:
            client_socket.sendall(f"Errore: {e}\n".encode())

        client_socket.close()

# Avvio del servizio modem seriale
def start_modems():
    config_file = "/neteye/shared/monitoring/plugins/serial_modem_emulator/modem_config.json"
    if not os.path.exists(config_file):
        print(f"File di configurazione {config_file} non trovato.")
        sys.exit(1)

    with open(config_file, "r") as f:
        config = json.load(f)

    serial_modems = [m for m in config["modems"] if m["type"] == "Serial"]
    modem_count = len(serial_modems)
    initialize_simulation_state(modem_count)

    threads = []
    for i, _ in enumerate(serial_modems):
        t = threading.Thread(target=create_serial_device, args=(i,))
        t.start()
        threads.append(t)

    threading.Thread(target=control_server, daemon=True).start()

    for t in threads:
        t.join()

# Stop del servizio modem seriale
def stop_modems():
    index = 0
    while True:
        symlink_path = f"{DEV_PREFIX}{index}"
        if os.path.exists(symlink_path):
            os.unlink(symlink_path)
            print(f"Device {symlink_path} rimosso.")
        else:
            break
        index += 1

if __name__ == "__main__":
    if len(sys.argv) != 2 or sys.argv[1] not in ["start", "stop", "restart"]:
        print("Usage: script.py [start|stop|restart]")
        sys.exit(1)

    if sys.argv[1] == "stop":
        stop_modems()
    elif sys.argv[1] == "start":
        start_modems()
    elif sys.argv[1] == "restart":
        stop_modems()
        start_modems()

Creating the Configuration File

Now you need to create the configuration file that will allow us to decide how many virtual modems we will need for our simulations. For each modem, a device of type “/dev/ttysX” will be created with the first available unique number.

# vim /neteye/shared/monitoring/plugins/serial_modem_emulator/modem_config.json

Within it we go on to define the configuration, in this case for two modems:

{
  "modems": [
    {
      "type": "Serial",
      "device_prefix": "/dev/ttys"
    },
    {
      "type": "Serial",
      "device_prefix": "/dev/ttys"
    }
  ]
}

Creating the Service

To finish up, and have our virtual modems configured and working, all that remains is to create the service we’ll use to manage these new devices:

# vim /etc/systemd/system/serial-modem-emulator.service
[Unit]
Description=Emulatore Modem GSM Seriale
After=network.target

[Service]
Type=simple
WorkingDirectory=/neteye/shared/monitoring/plugins/serial_modem_emulator
ExecStart=/usr/bin/python3 /neteye/shared/monitoring/plugins/serial_modem_emulator/serial_modem_emulator.py start
ExecStop=/usr/bin/python3 /neteye/shared/monitoring/plugins/serial_modem_emulator/serial_modem_emulator.py stop
Restart=on-failure
RestartSec=5
User=root
Group=root

[Install]
WantedBy=multi-user.target

With that created, let’s start the service:

# systemctl daemon-reload
# systemctl enable serial-modem-emulator
# systemctl start serial-modem-emulator

Verify the Service

We can check that the service is working by the command:

# systemctl status serial-modem-emulator

Or check whether the new devices were created correctly:

# ll /dev/ttys*
lrwxrwxrwx. 1 root root 10 Mar 20 17:54 /dev/ttys0 -> /dev/pts/1
lrwxrwxrwx. 1 root root 10 Mar 20 17:54 /dev/ttys1 -> /dev/pts/2

Configuring the SMSD Service

Now one might ask “but why create a service that simulates one or more GSM modems?” In my case I wanted to test the HA functionality offered by the smsd service, and not having 2 modems, I opted for this solution.

For example, I modified the configuration of the smsd service like this and then restarted the service:

# vim /neteye/local/smsd/conf/smsd.conf
devices = GSM1, GSM2
logfile = /neteye/local/smsd/log/smstools.log
loglevel = 7
failed = /neteye/local/smsd/data/spool/failed
incoming = /neteye/local/smsd/data/spool/incoming
checked = /neteye/local/smsd/data/spool/checked
outgoing = /neteye/local/smsd/data/spool/outgoing

[HA]
failover = true
monitor_interval = 60  # Controllo ogni minuto

[ROUTER]
rule = {
    if $GSM1.available then route to GSM1;
    else if $GSM2.available then route to GSM2;
}

[GSM1]
device = /dev/ttys0
baudrate = 115200
mode = new
incoming = yes
cs_convert = yes

[GSM2]
device = /dev/ttys1
baudrate = 115200
mode = new
incoming = yes
cs_convert = yes

With the service active and able to respond to the AT commands that smsd sends to the devices, I can have a clean service log as if everything is working properly for both devices, as shown below:

# tail -f /neteye/local/smsd/log/smstools.log
2025-03-21 18:48:06,7, GSM2: -> AT+CPMS?
2025-03-21 18:48:06,7, GSM2: Command is sent, waiting for the answer
2025-03-21 18:48:06,7, GSM1: -> AT+CPMS?
2025-03-21 18:48:06,7, GSM1: Command is sent, waiting for the answer
2025-03-21 18:48:07,7, GSM2: <- +CPMS: "SM",0,30,"SM",0,30,"SM",0,30 OK
2025-03-21 18:48:07,6, GSM2: Used memory is 0 of 30
2025-03-21 18:48:07,6, GSM2: No SMS received
2025-03-21 18:48:07,7, GSM1: <- +CPMS: "SM",0,30,"SM",0,30,"SM",0,30 OK
2025-03-21 18:48:07,6, GSM1: Used memory is 0 of 30
2025-03-21 18:48:07,6, GSM1: No SMS received
2025-03-21 18:48:17,6, GSM2: Checking device for incoming SMS
2025-03-21 18:48:17,6, GSM2: Checking if modem is ready
2025-03-21 18:48:17,6, GSM1: Checking device for incoming SMS
2025-03-21 18:48:17,6, GSM1: Checking if modem is ready
2025-03-21 18:48:17,7, GSM2: -> AT
2025-03-21 18:48:17,7, GSM2: Command is sent, waiting for the answer
2025-03-21 18:48:17,7, GSM1: -> AT
2025-03-21 18:48:17,7, GSM1: Command is sent, waiting for the answer
2025-03-21 18:48:17,7, GSM2: <- OK
2025-03-21 18:48:17,6, GSM2: Pre-initializing modem
2025-03-21 18:48:17,7, GSM1: <- OK
2025-03-21 18:48:17,6, GSM1: Pre-initializing modem
2025-03-21 18:48:18,7, GSM2: -> ATE0+CMEE=1;+CREG=2
2025-03-21 18:48:18,7, GSM2: Command is sent, waiting for the answer
2025-03-21 18:48:18,7, GSM1: -> ATE0+CMEE=1;+CREG=2
2025-03-21 18:48:18,7, GSM1: Command is sent, waiting for the answer
2025-03-21 18:48:18,7, GSM2: <- OK
2025-03-21 18:48:18,7, GSM1: <- OK
2025-03-21 18:48:18,7, GSM2: -> AT+CSQ
2025-03-21 18:48:18,7, GSM2: Command is sent, waiting for the answer
2025-03-21 18:48:18,7, GSM1: -> AT+CSQ
2025-03-21 18:48:18,7, GSM1: Command is sent, waiting for the answer
2025-03-21 18:48:18,7, GSM2: <- +CSQ: 20,99 OK
2025-03-21 18:48:18,6, GSM2: Signal Strength Indicator: (20,99) -73 dBm (Excellent), Bit Error Rate: not known or not detectable
2025-03-21 18:48:18,6, GSM2: Checking if Modem is registered to the network
2025-03-21 18:48:18,7, GSM1: <- +CSQ: 20,99 OK
2025-03-21 18:48:18,6, GSM1: Signal Strength Indicator: (20,99) -73 dBm (Excellent), Bit Error Rate: not known or not detectable
2025-03-21 18:48:18,6, GSM1: Checking if Modem is registered to the network
2025-03-21 18:48:19,7, GSM2: -> AT+CREG?
2025-03-21 18:48:19,7, GSM2: Command is sent, waiting for the answer
2025-03-21 18:48:19,7, GSM1: -> AT+CREG?
2025-03-21 18:48:19,7, GSM1: Command is sent, waiting for the answer
2025-03-21 18:48:19,7, GSM2: <- +CREG: 1,1 OK
2025-03-21 18:48:19,6, GSM2: Modem is registered to the network
2025-03-21 18:48:19,6, GSM2: Selecting PDU mode
2025-03-21 18:48:19,7, GSM1: <- +CREG: 1,1 OK
2025-03-21 18:48:19,6, GSM1: Modem is registered to the network
2025-03-21 18:48:19,6, GSM1: Selecting PDU mode
2025-03-21 18:48:19,7, GSM2: -> AT+CMGF=0
2025-03-21 18:48:19,7, GSM2: Command is sent, waiting for the answer
2025-03-21 18:48:19,7, GSM1: -> AT+CMGF=0
2025-03-21 18:48:19,7, GSM1: Command is sent, waiting for the answer
2025-03-21 18:48:19,7, GSM2: <- OK
2025-03-21 18:48:19,6, GSM2: Checking memory size
2025-03-21 18:48:19,7, GSM1: <- OK
2025-03-21 18:48:19,6, GSM1: Checking memory size
2025-03-21 18:48:20,7, GSM1: -> AT+CPMS?
2025-03-21 18:48:20,7, GSM1: Command is sent, waiting for the answer
2025-03-21 18:48:20,7, GSM2: -> AT+CPMS?
2025-03-21 18:48:20,7, GSM2: Command is sent, waiting for the answer
2025-03-21 18:48:20,7, GSM1: <- +CPMS: "SM",0,30,"SM",0,30,"SM",0,30 OK
2025-03-21 18:48:20,6, GSM1: Used memory is 0 of 30

Monitoring the Two Modems in NetEye

A Final, Advanced Feature

Now that the modems are up and running, the feature I decided to implement in order to have two devices behaving exactly like two real modems was to be able to dynamically change their state through simple calls that override their default state.

For example, it’s possible to change the state of signal strength or SIM registration for modem 0 (ttys0) with these two commands:

Change into No Signal:

# echo "0|set_state|AT+CSQ|+CSQ: 0,99\r\nOK\r\n" | nc 127.0.0.1 5000
Stato del comando AT+CSQ per modem 0 impostato su: +CSQ: 0,99\r\nOK\r\n

Change into No SIM Registration:

# echo "0|set_state|AT+CREG?|+CREG: 0,3\r\nOK\r\n" | nc 127.0.0.1 5000
Stato del comando AT+CREG? per modem 0 impostato su: +CREG: 0,3\r\nOK\r\n

For GSM modem1, change 0 to 1.

2025-03-21 19:15:24,7, GSM2: Command is sent, waiting for the answer
2025-03-21 19:15:24,7, GSM1: <- +CSQ: 0,99\r\nOK\r\n
2025-03-21 19:15:24,5, GSM1: Signal Strength Indicator: (0,99) -113 dBm (Marginal) or less, Bit Error Rate: not known or not detectable
2025-03-21 19:15:24,7, GSM2: <- +CSQ: 20,99 OK
2025-03-21 19:15:24,6, GSM2: Signal Strength Indicator: (20,99) -73 dBm (Excellent), Bit Error Rate: not known or not detectable
2025-03-21 19:15:24,6, GSM2: Checking if Modem is registered to the network
2025-03-21 19:15:24,7, GSM1: -> AT+CREG?
2025-03-21 19:15:24,7, GSM1: Command is sent, waiting for the answer
2025-03-21 19:15:24,7, GSM2: -> AT+CREG?
2025-03-21 19:15:24,7, GSM2: Command is sent, waiting for the answer
2025-03-21 19:15:24,7, GSM1: <- +CREG: 0,3\r\nOK\r\n
2025-03-21 19:15:24,6, GSM1: Modem said: registration denied. Retrying.
2025-03-21 19:15:24,7, GSM2: <- +CREG: 1,1 OK
2025-03-21 19:15:24,6, GSM2: Modem is registered to the network
2025-03-21 19:15:24,6, GSM2: Selecting PDU mode
2025-03-21 19:15:25,7, GSM2: -> AT+CMGF=0
2025-03-21 19:15:25,7, GSM2: Command is sent, waiting for the answer
2025-03-21 19:15:25,7, GSM2: <- OK
2025-03-21 19:15:25,6, GSM2: Checking memory size

Running this command instead will reset the modem to its default state:

Reset to Default:

# echo "0|reset" | nc 127.0.0.1 5000
Stato del modem 0 ripristinato ai valori di default.

Of course, the messages will never leave the system, but we cna find them in the failed directory of the smsd spool, and we can check whether the result of a notification correctly reflects what we were expecting and especially from which of the two modems the message originated.

# cd /neteye/local/smsd/data/spool/failed
# cat send_BpJnK9
To: 0039xxxxxxxxx3
Modem: GSM2
IMSI: 123456789012345
Fail_reason: ERROR
Failed: 25-03-20 17:58:34
PDU: 0051000C919343795701370000FFA0050003BF02019C657A315F066D7850E953C82C36373E50333F8D32414461730A9A96E5F6F4B80CAAC3E9E97619F476839C657A315F0635C3737A590ED2BFDD6550D14D86BFD36E3A283D070DA5496A72186486405774D9AD03C56EBA1A0E2483B59A61794B0693D5402DD0509AA426874126A8052287E96171785E06A5E7A03A1C344FBBC765D00CD44EBBEB

NetEye [PROBLEM] MySQL DBMS Service uptime on NetEye Master zone Endpoint is CRITICAL! When: 17:58 20-Mar-2025 - CRITICAL - database is up since 3 minutes

The service most likely still needs improvement and certainly won’t be perfect and clean at the code level, but it does what it’s supposed to do and for my purposes it was and is more than sufficient. Maybe it will come in handy for someone else as well and they can improve it.

These Solutions are Engineered by Humans

Did you find this article interesting? Does it match your skill set? Programming is at the heart of how we develop customized solutions. In fact, we’re currently hiring for roles just like this and others here at Würth Phoenix.

Andrea Mariani

Andrea Mariani

Author

Andrea Mariani

Leave a Reply

Your email address will not be published. Required fields are marked *

Archive