Komunikasi EL-UHF-RC4 dengan Komputer

Pada artikel ini, kami mencoba berkomunikasi dengan reader EL-UHF-RC4 series menggunakan protokol melalui koneksi Serial (RS232) atau TCP/IP.

Menggunakan koneksi Serial diperlukan kabel RS232 to USB untuk menghubungkan reader ke komputer. Sedangkan untuk koneksi TCP/IP, reader terhubung melalui kabel Ethernet ke komputer/router.

Kode

🐍 Python

Kode python di bawah ini adalah versi sederhana. Contoh kode Python lengkap tersedia dalam program demo (dibuat menggunakan bahasa Python), link download: https://cloud.electron.id/s/wSap3C2PiWXz4Jj

transport.py

import glob
import sys
from enum import Enum
from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
 
import serial
 
 
T = TypeVar('T', bound='Parent')
 
 
class BaudRate(Enum):
    BPS_9600 = 0x00
    BPS_19200 = 0x01
    BPS_38400 = 0x02
    BPS_57600 = 0x03
    BPS_115200 = 0x04
 
    _ignore_ = ["INT"]
    INT = []
 
    def __str__(self) -> str:
        return f'{self.to_int} bps'
 
    @property
    def to_int(self) -> int:
        return self.INT[self.value]
 
    @classmethod
    def from_int(cls, value: int) -> T:
        for baud_rate in BaudRate:
            if baud_rate.to_int == value:
                return baud_rate
 
 
BaudRate.INT = [9600, 19200, 38400, 57600, 115200]
 
 
class Transport(ABC):
    @abstractmethod
    def connect(self, **kwargs) -> None:
        raise NotImplementedError
 
    @abstractmethod
    def reconnect(self, **kwargs) -> bytes:
        raise NotImplementedError
 
    @abstractmethod
    def read_bytes(self, **kwargs) -> bytes:
        raise NotImplementedError
 
    @abstractmethod
    def write_bytes(self, buffer: bytes) -> None:
        raise NotImplementedError
 
    @abstractmethod
    def clear_buffer(self) -> None:
        raise NotImplementedError
 
    @abstractmethod
    def close(self) -> None:
        raise NotImplementedError
 
 
class TcpTransport(Transport):
    def __init__(self, ip_address: str, port: int, timeout: int = 6) -> None:
        self.ip_address: str = ip_address
        self.port: int = port
        self.timeout: int = timeout
        self.socket: socket | None = None
 
    def __str__(self) -> str:
        return f'TcpTransport(ip_address: {self.ip_address}, port: {self.port}, timeout: {self.timeout})'
 
    def connect(self) -> None:
        self.socket = socket(AF_INET, SOCK_STREAM)
        self.socket.settimeout(self.timeout)
        self.socket.connect((self.ip_address, self.port))
 
    def reconnect(self, ip_address: str | None = None, port: int | None = None,
                  timeout: int | None = None) -> None:
        self.socket.close()
        self.ip_address = ip_address if ip_address else self.ip_address
        self.timeout = timeout if timeout else self.timeout
        self.connect()
 
    def read_bytes(self, length: int) -> bytes:
        return self.socket.recv(length)
 
    def write_bytes(self, buffer: bytes) -> None:
        self.socket.sendall(buffer)
 
    def clear_buffer(self) -> None:
        # self.socket.recv(1024)
        pass
 
    def close(self) -> None:
        self.socket.close()
 
 
class SerialTransport(Transport):
    def __init__(self, serial_port: str, baud_rate: BaudRate, timeout: int = 0.5) -> None:
        self.serial_port = serial_port
        self.baud_rate = baud_rate
        self.timeout = timeout
        self.serial = serial.Serial(self.serial_port, self.baud_rate.to_int,
                                    timeout=timeout, write_timeout=timeout * 2)
 
    def __str__(self) -> str:
        return f'SerialTransport(port: {self.serial_port}, baud_rate: {self.baud_rate})'
 
    @classmethod
    def scan(cls, timeout: int = 1) -> list[str]:
        result: list[str] = []
        if sys.platform.startswith("win"):  # Windows
            ports = ["COM%s" % (i + 1) for i in range(15)]
        elif sys.platform.startswith("linux") or sys.platform.startswith("cygwin"):  # Linux
            # this excludes your current terminal "/dev/tty"
            ports = glob.glob("/dev/tty[A-Za-z]*")
        elif sys.platform.startswith("darwin"):  # Mac OS
            ports = glob.glob("/dev/tty.*")
        else:
            raise EnvironmentError("Unsupported platform")
        for port in ports:
            try:
                s = serial.Serial(port, timeout=timeout)
                s.close()
                result.append(port)
            except serial.SerialException as _:
                pass
        return result
 
    def connect(self, **kwargs):
        pass
 
    def reconnect(self, serial_port: str | None = None, baud_rate: BaudRate | None = None,
                  timeout: int | None = None) -> None:
        self.close()
        self.serial_port = serial_port if serial_port else self.serial.port
        self.baud_rate = baud_rate if baud_rate else BaudRate.from_int(self.serial.baudrate)
        self.timeout = timeout if timeout else self.serial.timeout
        self.serial = serial.Serial(self.serial_port, self.baud_rate.to_int,
                                    timeout=self.timeout, write_timeout=self.timeout * 2)
 
    def read_bytes(self, length: int) -> bytes:
        response = self.serial.read(length)
        return response
 
    def write_bytes(self, buffer: bytes) -> None:
        self.serial.write(buffer)
 
    def clear_buffer(self) -> None:
        self.serial.reset_input_buffer()
        self.serial.reset_output_buffer()
 
    def close(self) -> None:
        self.serial.close()

command.py

from enum import Enum
from utils import calculate_checksum
 
 
HEADER = 0xCF
BROADCAST_ADDRESS = 0xFF
 
 
class CommandOption(Enum):
    SET = 0x01
    GET = 0x02
 
 
class CommandRequest(Enum):
    SET_POWER = 0x0053
    GET_DEVICE_INFO = 0x0070
    INVENTORY_ISO_CONTINUE = 0x0001
    INVENTORY_STOP = 0x0002
    INVENTORY_ACTIVE = 0x0001
 
 
class Command(object):
    def __init__(self,
                 command: CommandRequest,
                 address=BROADCAST_ADDRESS,
                 data: bytes | bytearray = bytearray()) -> None:
        self.address = address
        self.command = command
        self.data = data
 
    def serialize(self, with_checksum: bool = True) -> bytes:
        base_data = bytearray(
            [HEADER, self.address]) + \
                    self.command.value.to_bytes(2, "big") + \
                    bytearray([len(self.data)]) + \
                    bytearray(self.data)
        if with_checksum:
            checksum = calculate_checksum(base_data)
            base_data.extend(checksum)
        return base_data

response.py

from dataclasses import dataclass
from enum import Enum
 
from command import HEADER, CommandRequest
from utils import calculate_checksum, hex_readable
 
 
class Status(Enum):
    SUCCESS = 0x00
    WRONG_PARAM = 0x01
    CMD_EXECUTION_FAILED = 0x02
    RESERVE = 0x03
    NO_COUNT_LABEL = 0x12
    TIMEOUT = 0x14
    TAG_RESPONSE_ERROR = 0x15
    AUTHENTICATION_FAILED = 0x16
    WRONG_PASSWORD = 0x17
    NO_MORE_DATA = 0xFF
 
 
class TagStatus(Enum):
    NO_ERROR = 0xFF
    TIMEOUT = 0x14
    OTHER_ERROR = 0x81
    STORAGE_AREA_ERROR = 0x82
    STORAGE_LOCK = 0x83
    INSUFFICIENT_POWER = 0x84
    NO_POWER = 0x85
 
 
class InventoryStatus(Enum):
    SUCCESS = 0x00
    WRONG_PARAM = 0x01
    CMD_EXECUTION_FAILED = 0x02
    NO_COUNT_LABEL = 0x12
    EXCEED_MAX_TRANSMIT_SERIAL = 0x17
 
 
@dataclass
class Tag:
    rssi: bytes
    antenna: int
    channel: int
    data: bytes
    count: int = 1
 
    def __str__(self) -> str:
        return f'Tag(rssi: {hex_readable(self.rssi)}, ' \
               f'antenna: {self.antenna}, channel: {self.channel}, ' \
               f'data: {hex_readable(self.data)})'
 
 
class Response:
    def __init__(self, response: bytes) -> None:
        if response is None:
            raise ValueError("Response is None")
 
        header_section: bytes = response[0:5]
        assert header_section[0] == HEADER
        self.header: int = response[0]
        self.address: int = response[1]
        self.command: CommandRequest = CommandRequest(int.from_bytes(response[2:4], "big"))
        self.length: int = response[4]
        self.status: Status = Status(response[5])
 
        __body_n_checksum_section: bytes = response[6: 4 + self.length + 2 + 1]
        self.payload: bytes = __body_n_checksum_section[0:-2]
        self.checksum: bytes = __body_n_checksum_section[-2:]
 
        # Verify checksum
        data = bytearray(header_section)
        data.extend(bytearray([self.status.value]))
        if self.payload:
            data.extend(self.payload)
        crc_msb, crc_lsb = calculate_checksum(data)
        assert self.checksum[0] == crc_msb and self.checksum[1] == crc_lsb

reader.py

from enum import Enum
from typing import Iterator
 
from command import CommandRequest, Command
from response import Response, InventoryStatus, Tag
from transport import Transport
from utils import hex_readable
 
 
class StopType(Enum):
    """\
    TIME: int = According to the time (in seconds)
 
    NUMBER: int = According to the number or cycles
    """
    TIME = 0x00
    NUMBER = 0x01
 
 
class Reader:
    def __init__(self, transport: Transport) -> None:
        super().__init__()
        self.transport = transport
        self.transport.connect()
 
    def close(self) -> None:
        self.transport.close()
 
    def __receive_response(self) -> bytes | None:
        # Get header section
        response_header_section: bytes = self.transport.read_bytes(length=5)
        if not response_header_section:
            return
 
        assert len(response_header_section) == 5
 
        # Get body section
        body_length: int = response_header_section[-1]
        response_body_section: bytes = self.transport.read_bytes(length=body_length + 2)  # 2(checksum)
 
        return response_header_section + response_body_section
 
 
    def set_power(self, power: int) -> Response:
        cmd_request: CommandRequest = CommandRequest.SET_POWER
        command: Command = Command(cmd_request, data=bytearray([power, 0x00]))
 
        # Send request
        self.transport.write_bytes(command.serialize())
 
        # Receive response
        raw_response: bytes | None = self.__receive_response()
        if raw_response is None:
            raise RuntimeError("No response from reader.")
        response: Response = Response(raw_response)
 
        # Validation response
        assert response.command == cmd_request
 
        return response
 
    def start_inventory_answer_mode(self, stop_type: StopType, value: int) -> Iterator[Tag]:
        cmd_request: CommandRequest = CommandRequest.INVENTORY_ISO_CONTINUE
        data = bytearray([stop_type.value])
        data.extend(value.to_bytes(4, "big"))
        command = Command(cmd_request, data=data)
 
        # Send request
        self.transport.write_bytes(command.serialize())
 
        while True:
            raw_response: bytes | None = self.__receive_response()
            if raw_response is None:
                return
            response: Response = Response(raw_response)
 
            if response.command == CommandRequest.INVENTORY_STOP or not response.payload:
                print("Inventory finished!")
                break
 
            tag: Tag = Tag(
                rssi=response.payload[0:2],
                antenna=response.payload[2],
                channel=response.payload[3],
                data=response.payload[5:])
            yield tag
 
    def stop_inventory_answer_mode(self) -> Response:
        cmd_request: CommandRequest = CommandRequest.INVENTORY_STOP
        command: Command = Command(cmd_request)
 
        # Send request
        self.transport.write_bytes(command.serialize())
 
        # Receive response
        response: Response = Response(self.__receive_response())
 
        # Validation response
        assert response.command == cmd_request
 
        return response
 

utils.py

from array import array
 
 
def hex_readable(data_bytes: bytes | array, separator: str = " ") -> str:
    return separator.join("{:02X}".format(x) for x in data_bytes)
 
 
def ip_bytes(ip_str: str) -> bytearray:
    ip_str_split = ip_str.split('.')
    assert len(ip_str_split) == 4
 
    return bytearray([int(ip) for ip in ip_str_split])
 
 
def calculate_checksum(data: bytes) -> bytearray:
    value = 0xFFFF
    for d in data:
        value ^= d
        for _ in range(8):
            value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
    crc_msb = value >> 0x08
    crc_lsb = value & 0xFF
    return bytearray([crc_msb, crc_lsb])
 
 
def calculate_rssi(rssi: bytes) -> int:
    return int.from_bytes(rssi, "big", signed=True)
 

main.py

from typing import Iterator
 
from response import Tag
from transport import Transport, SerialTransport, BaudRate, TcpTransport
from reader import Reader, StopType
 
transport: Transport = SerialTransport('/dev/ttyUSB0', BaudRate.BPS_115200)
# transport: Transport = TcpTransport('192.168.1.250', 2022)
reader: Reader = Reader(transport)
 
# 1. Inventory answer mode
tags: Iterator[Tag] = reader.start_inventory_answer_mode(stop_type=StopType.TIME, value=3) # Stop after 3 seconds
for tag in tags:
    print(tag)
 
# 2. Set power
# reader.set_power(10)
 
reader.close()