Komunikasi EL-UHF-RF014 menggunakan Bahasa Pemrograman Python

Tutorial ini menjelaskan cara berkomunikasi dengan reader EL-UHF-RF014 melalui koneksi Serial (RS232) atau TCP/IP tanpa menggunakan SDK.

Komunikasi dengan reader dilakukan langsung menggunakan protokol bawaan, yaitu dengan mengirim dan menerima data dalam bentuk bytes, lalu melakukan parsing untuk memproses respons yang diterima.

Untuk koneksi Serial, diperlukan kabel RS232 to USB agar reader dapat terhubung ke komputer. Sementara itu, koneksi TCP/IP menggunakan kabel Ethernet untuk menghubungkan reader ke komputer atau jaringan.

Berikut adalah diagram koneksi ke komputer:

🐍 Python

Kode python di bawah ini adalah versi sederhana hanya untuk inventory (answer / active mode). Anda dapat mengembangkan sendiri sesuai yang dibutuhkan berdasarkan dokumentasi protokol.

transport.py

from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
import serial
 
T = TypeVar('T', bound='Parent')
 
 
class Transport(ABC):
    @abstractmethod
    def read_bytes(self, length: int) -> bytes:
        raise NotImplementedError
 
    @abstractmethod
    def write_bytes(self, buffer: bytes) -> None:
        raise NotImplementedError
 
    def read_frame(self) -> bytes | None:
        length_bytes = self.read_bytes(1)
        if not length_bytes:
            return
        frame_length = ord(chr(length_bytes[0]))
        data = length_bytes + self.read_bytes(frame_length)
        return bytearray(data)
 
    @abstractmethod
    def close(self) -> None:
        raise NotImplementedError
 
 
class TcpTransport(Transport):
    def __init__(self, ip_address: str, port: int, timeout: int = 1) -> None:
        self.socket = socket(AF_INET, SOCK_STREAM)
        self.socket.settimeout(timeout)
        self.socket.connect((ip_address, port))
 
    def read_bytes(self, length: int) -> bytes:
        return self.socket.recv(length)
 
    def write_bytes(self, buffer: bytes) -> None:
        self.socket.sendall(buffer)
 
    def close(self) -> None:
        self.socket.close()
 
 
class SerialTransport(Transport):
    def __init__(self, serial_port: str, baud_rate: int, timeout: int = 1) -> None:
        self.serial = serial.Serial(serial_port, baud_rate,
                                    timeout=timeout, write_timeout=timeout)
 
    def read_bytes(self, length: int) -> bytes:
        return self.serial.read(length)
 
    def write_bytes(self, buffer: bytes) -> None:
        self.serial.write(buffer)
 
    def close(self) -> None:
        self.serial.close()
 

command.py

from utils import calculate_checksum
 
CMD_INVENTORY_ANSWER_MODE: int = 0x01
CMD_INVENTORY_ACTIVE_MODE: int = 0xEE
CMD_SET_WORK_MODE: int = 0x76
CMD_GET_WORK_MODE: int = 0x77
 
 
class Command:
    def __init__(self, command: int, reader_address: int = 0xFF,
                 data: bytes | int | None = None):
        self.command = command
        self.reader_address = reader_address
        self.data = data
        if isinstance(data, int):
            self.data = bytearray([data])
        if data is None:
            self.data = bytearray()
        self.frame_length = 4 + len(self.data)
        self.base_data = bytearray([self.frame_length, self.reader_address, self.command])
        self.base_data.extend(self.data)
 
    def serialize(self) -> bytes:
        serialize = self.base_data
        checksum = calculate_checksum(serialize)
        serialize.extend(checksum)
        return serialize
 

response.py

from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
 
from utils import verify_checksum, hex_readable
 
 
@dataclass
class Response:
    response_bytes: bytes
    length: int = field(init=False)
    reader_address: int = field(init=False)
    command: int = field(init=False)
    status: int = field(init=False)
    data: bytes = field(init=False)
    checksum: bytes = field(init=False)
 
    def __post_init__(self):
        if len(self.response_bytes) < 6:
            raise ValueError(f"Response data is too short to be valid. Response: {hex_readable(self.response_bytes)}")
 
        self.length = self.response_bytes[0]
        if len(self.response_bytes) < self.length:
            raise ValueError("Response length mismatch.")
 
        self.reader_address = self.response_bytes[1]
        self.command = self.response_bytes[2]
        self.status = self.response_bytes[3]
        self.data = self.response_bytes[4 : self.length - 1]
        self.checksum = verify_checksum(self.response_bytes)
 
        if self.status == 0xFD:
            raise ValueError("Incorrect command frame length.")
        if self.status == 0xFF:
            raise ValueError("Unrecognised parameter.")
 
    def __str__(self) -> str:
        parts = [
            ">>> START RESPONSE ================================"
            f"\nRESPONSE       >> {hex_readable(self.response_bytes)}",
            f"\nREADER ADDRESS >> {hex_readable(self.reader_address)}",
            f"\nCOMMAND        >> {hex_readable(self.command)}",
            f"\nSTATUS         >> {hex_readable(self.status)}",
        ]
        if self.data:
            parts.append(f"\nDATA           >> {hex_readable(self.data)}")
        parts.append(f"\nCHECKSUM       >> {hex_readable(self.checksum)}")
        parts.append("\n>>> END RESPONSE   ================================")
        return "".join(parts).strip()
 
 
@dataclass
class WorkMode:
    read_mode: int  # 1 byte
    tag_protocol: int  # 1 byte
    read_pause_time: int  # 1 byte
    filter_time: int  # 1 byte
    q_value: int  # 1 byte (bitwise handling required)
    session: int  # 1 byte
    mask_memory_bank: int  # 1 byte
    mask_start_address: int  # 2 bytes (big-endian)
    mask_length: int  # 1 byte
    mask_data: bytes  # 32 bytes
    address_tid: int  # 1 byte
    length_tid: int  # 1 byte
 
    @property
    def strategy_indicator(self) -> int:
        return (self.q_value >> 6) & 0b1
 
    @property
    def original_q_value(self) -> int:
        return self.q_value & 0b111111  # Bits 5-0
 
    @classmethod
    def from_bytes(cls, data: bytes):
        if len(data) != 44:
            raise ValueError(f"Invalid data length ({len(data)}), expected 44 bytes")
 
        return cls(
            read_mode=data[0],
            tag_protocol=data[1],
            read_pause_time=data[2],
            filter_time=data[3],
            q_value=data[4],
            session=data[5],
            mask_memory_bank=data[6],
            mask_start_address=int.from_bytes(data[7:9], 'big'),
            mask_length=data[9],
            mask_data=data[10:42],
            address_tid=data[40],
            length_tid=data[41]
        )
 
@dataclass
class ActiveModeTag(Response):
    antennas: Optional[List[int]] = field(init=False, default=None)
    tag: Optional[bytes] = field(init=False, default=None)
    rssi: Optional[int] = field(init=False, default=None)
 
    def __post_init__(self) -> None:
        super().__post_init__()
        assert self.status == 0x00, "Invalid status for success active mode response."
 
        self.antennas = [i + 1 for i in range(8) if (self.data[0] >> i) & 1]
        tag_length = self.data[1] # Tag length
        self.tag = self.data[2 : tag_length + 2]  # EPC/TID
        self.rssi = self.data[tag_length + 2] # RSSI
 
 
class AntennaStatus(Enum):
    IDLE: int = 0x00
    WORKING: int = 0x01
    DISCONNECTED: int = 0x02
    UNKNOWN: int = 0xFF
 
    @classmethod
    def from_byte(cls, byte: int) -> "AntennaStatus":
        return cls(byte) if byte in cls._value2member_map_ else cls.UNKNOWN
 
@dataclass
class ActiveModeHeartbeatStatus:
    packet_no: int
    antenna_status: AntennaStatus
    total_count: int
 
 
@dataclass
class ActiveModeHeartbeat(Response):
    antenna_1: Optional[ActiveModeHeartbeatStatus] = field(init=False, default=None)
    antenna_2: Optional[ActiveModeHeartbeatStatus] = field(init=False, default=None)
    antenna_3: Optional[ActiveModeHeartbeatStatus] = field(init=False, default=None)
    antenna_4: Optional[ActiveModeHeartbeatStatus] = field(init=False, default=None)
 
    def __post_init__(self) -> None:
        super().__post_init__()
        assert self.status == 0x28, "Invalid status for heartbeat active mode response."
        assert len(self.data) == 12, "Invalid data length for heartbeat packet."
 
        packet_no: bytes = self.data[:4]
        antenna_status: bytes = self.data[4:8]
        total_count: bytes = self.data[8:12]
 
        self.antenna_1 = ActiveModeHeartbeatStatus(
            packet_no=packet_no[0],
            antenna_status=AntennaStatus.from_byte(antenna_status[0]),
            total_count=total_count[0],
        )
        self.antenna_2 = ActiveModeHeartbeatStatus(
            packet_no=packet_no[1],
            antenna_status=AntennaStatus.from_byte(antenna_status[1]),
            total_count=total_count[1],
        )
        self.antenna_3 = ActiveModeHeartbeatStatus(
            packet_no=packet_no[2],
            antenna_status=AntennaStatus.from_byte(antenna_status[2]),
            total_count=total_count[2],
        )
        self.antenna_4 = ActiveModeHeartbeatStatus(
            packet_no=packet_no[3],
            antenna_status=AntennaStatus.from_byte(antenna_status[3]),
            total_count=total_count[3],
        )
 
 
@dataclass
class ResponseAnswerModeStatistic(Response):
    antennas: Optional[List[int]] = field(init=False, default=None)
    read_rate: Optional[int] = field(init=False, default=None)
    total_count: Optional[int] = field(init=False, default=None)
 
    def __post_init__(self) -> None:
        super().__post_init__()
        assert self.status == 0x26, "Invalid status for statistic answer mode response."
 
        self.antennas = [i + 1 for i in range(8) if (self.data[0] >> i) & 1]
        self.read_rate = int.from_bytes(self.data[1:3], "big")
        self.total_count = int.from_bytes(self.data[3:7], "big")
 
 
@dataclass
class AnswerModeTag:
    fast_id_enabled: Optional[bool]
    tag: Optional[bytes]
    rssi: Optional[int]
 
 
@dataclass
class ResponseAnswerMode(Response):
    antennas: Optional[List[int]] = field(init=False, default=None)
    num: Optional[int] = field(init=False, default=None)
    tags: Optional[list[AnswerModeTag]] = field(init=False, default=None)
 
    def __post_init__(self) -> None:
        super().__post_init__()
        if self.status == 0xF8:
            raise Exception("Antenna error detected, the current antenna might be disconnected.")
        elif self.status == 0x04:
            raise Exception("Reader run out of memory space due to the amount of tags.")
 
        self.antennas = [i + 1 for i in range(8) if (self.data[0] >> i) & 1]
        self.num = self.data[1]
        self.tags = []
 
        data = self.data[2:]
        index = 0
        while index < self.num:
            # Read EPC header (1 byte)
            epc_header = data[index]
            fast_id_enabled = (epc_header & 0x80) >> 7  # bit7
            epc_length = epc_header & 0x7F  # bit6 ~ bit0
            index += 1
 
            # Read EPC data (N bytes)
            tag = data[index:index + epc_length]
            index += epc_length
 
            # Read RSSI (1 byte)
            rssi = data[index]
            index += 1
 
            self.tags.append(
                AnswerModeTag(
                    fast_id_enabled=bool(fast_id_enabled),
                    tag=tag,
                    rssi=rssi
                )
            )
 
 
 

reader.py

from typing import Iterator
from transport import Transport
from command import *
from response import *
 
 
class Reader:
    def __init__(self, transport: Transport) -> None:
        self.transport = transport
 
    def close(self) -> None:
        self.transport.close()
 
    def _send_request(self, command: Command) -> None:
        self.transport.write_bytes(command.serialize())
 
    def _get_response(self) -> bytes:
        return self.transport.read_frame()
 
    def set_inventory_work_mode(self, work_mode: int) -> Response:
        """
        8.4.23 Modify reader working mode
        Args:
            work_mode:  0: Answer mode, 1: Active mode, 2: Trigger mode
        Returns:
            Response
        """
        if work_mode not in (0x00, 0x01, 0x02):
            raise ValueError("Work mode must be 0x00, 0x01, or 0x02")
 
        self._send_request(Command(CMD_SET_WORK_MODE, data=work_mode))
        raw_response: bytes = self._get_response()
        return Response(raw_response)
 
    def work_mode(self) -> WorkMode:
        """
        8.4.24 Obtain reader working mode
        Returns:
            WorkMode
        """
        self._send_request(Command(CMD_GET_WORK_MODE))
        response: Response = Response(self._get_response())
        return WorkMode.from_bytes(response.data)
 
    def inventory_active_mode(self) -> Iterator[ActiveModeTag | ActiveModeHeartbeat]:
        """
        Real-time inventory mode
        :return:
        """
        while True:
            try:
                raw_response: bytes | None = self._get_response()
            except TimeoutError:
                print("TimeoutError occurred, continue inventory.")
                continue
            if raw_response is None:
                continue
 
            response: Response = Response(raw_response)
            assert response.command == CMD_INVENTORY_ACTIVE_MODE, (
                f"Unexpected response command: expected 0x{hex_readable(CMD_INVENTORY_ACTIVE_MODE)}, "
                f"but got 0x{hex_readable(response.command)}"
            )
 
            if response.status not in [
                0x00, # Detected appropriate tag
                0x28, # No appropriate tag detected within the heartbeat packet time interval
            ]:
                continue
 
            if response.status == 0x00:
                yield ActiveModeTag(raw_response)
            elif response.status == 0x28:
                yield ActiveModeHeartbeat(raw_response)
 
    def inventory_answer_mode(
            self,
            statistic_data_flag: bool = False,
            strategy_indicator: bool = False,
            fastid_inventory: bool = False,
            q_value: int = 4,
            session: int = 0,
            mask_memory_bank: int = 1,
            mask_start_address: int = 0,
            mask_data: bytes = bytes(),
            tid_start_address: Optional[int] = None,
            tid_length: Optional[int] = None,
            target: Optional[int] = None,
            antenna: Optional[int] = None,
            scan_time: Optional[int] = None,
    ) -> Iterator[ResponseAnswerMode]:
        """
        8.2.1 Tags inventory
        Args:
            statistic_data_flag (bool): Statistic data packet flag.
            strategy_indicator (bool): Strategy indicator (0=general, 1=special).
            fastid_inventory (bool): Impinj FastID function.
            q_value (int): The Q-value (0-15).
            session (int): Session value (0x00, 0x01, 0x02, 0x03, or 0xff).
            mask_memory_bank (int): Mask memory (0x01=EPC, 0x02=TID, 0x03=User).
            mask_start_address (int): Mask entry bit address (0-16383).
            mask_length (int): Bit length of mask.
            mask_data (bytes): Mask data.
            tid_start_address (int, optional): Entry address of inventory TID memory.
            tid_length (int, optional): Data length for TID inventory (0-15).
            target (int, optional): Target value (0x00=A, 0x01=B).
            antenna (int, optional): Antenna selection (0x80-0x83).
            scan_time (int, optional): Inventory scan time (0-255, unit: 100ms).
 
        Returns:
            Iterator[ResponseAnswerMode]: Generator yielding inventory responses.
        """
 
        # Validations with detailed messages
        def validate_range(name, value, min_val, max_val):
            if not (min_val <= value <= max_val):
                raise ValueError(f"{name} must be in range {min_val}-{max_val}, but got {value}")
 
        validate_range("QValue", q_value, 0, 15)
        if session not in (0x00, 0x01, 0x02, 0x03, 0xFF):
            raise ValueError(f"Session must be one of [0x00, 0x01, 0x02, 0x03, 0xFF], but got {session}")
        if mask_memory_bank not in (0x01, 0x02, 0x03):
            raise ValueError(f"MaskMem must be 0x01 (EPC), 0x02 (TID), or 0x03 (User), but got {mask_memory_bank}")
        validate_range("MaskAdr", mask_start_address, 0, 16383)
 
        if tid_start_address is not None or tid_length is not None:
            validate_range("AdrTID", tid_start_address, 0, 65535)
            validate_range("LenTID", tid_length, 0, 15)
 
        if target is not None \
                or antenna is not None \
                or scan_time is not None:
            raise ValueError("Target, Antenna, and ScanTime if one provided, must all be provided.")
 
        if target is not None and target not in (0x00, 0x01):
            raise ValueError(f"Target must be 0x00 (A) or 0x01 (B), but got {target}")
 
        if antenna is not None and antenna not in (0x80, 0x81, 0x82, 0x83):
            raise ValueError(f"Antenna must be 0x80, 0x81, 0x82, or 0x83, but got {antenna}")
 
        if scan_time is not None:
            validate_range("ScanTime", scan_time, 0, 255)
 
        # Construct QValue byte
        qvalue_byte = (
                (statistic_data_flag << 7) |
                (strategy_indicator << 6) |
                (fastid_inventory << 5) |
                (q_value & 0x1F)
        )
 
        # Required parameters
        params: bytearray = bytearray()
        params.extend([qvalue_byte])
        params.extend([session])
        params.extend([mask_memory_bank])
        params.extend(mask_start_address.to_bytes(2, byteorder="big"))
 
        # Append MasKLen
        mask_length = len(mask_data) * 8
        params.extend([mask_length])
 
        # Append MaskData
        mask_data_len = (mask_length // 8) + (1 if mask_length % 8 != 0 else 0)
        mask_data = mask_data.ljust(mask_data_len, b'\x00')
        params.extend(mask_data)
 
        # AdrTID & LenTID
        if mask_memory_bank == 0x02:
            if tid_start_address is not None or tid_length is not None:
                params.extend([tid_start_address, tid_length])
 
        # Optional parameters (only included if provided)
        if target is not None \
                or antenna is not None \
                or scan_time is not None:
            params.append(target)
            params.append(antenna)
            params.append(scan_time)
 
        # Send command
        command = Command(CMD_INVENTORY_ANSWER_MODE, data=params)
        self._send_request(command)
 
        # Get responses
        while True:
            response = ResponseAnswerMode(self._get_response())
 
            assert response.command == CMD_INVENTORY_ANSWER_MODE, (
                f"Unexpected response command: expected {hex(CMD_INVENTORY_ANSWER_MODE)}, "
                f"but got {hex(response.command)}"
            )
 
            if response.status == 0x03:
                yield response
                continue
            elif response.status == 0x02:
                raise Exception("Inventory timeout, operation aborted.")
            elif response.status == 0x01:
                break  # Operation completed
 

utils.py

def calculate_checksum(data: bytes) -> bytearray:
    value = 0xFFFF
    for d in data:
        value ^= d
        for _ in range(8):
            value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
    crc_msb = value >> 0x08
    crc_lsb = value & 0xFF
    return bytearray([crc_lsb, crc_msb])
 
 
def hex_readable(data: bytes | int, bytes_separator: str = " ") -> str:
    if isinstance(data, int):
        return "{:02X}".format(data)
    return bytes_separator.join("{:02X}".format(x) for x in data)
 
 
def verify_checksum(response_bytes: bytes) -> bytes:
    data = bytearray(response_bytes[:-2])
    crc_msb, crc_lsb = calculate_checksum(data)
    if response_bytes[-2] != crc_msb or response_bytes[-1] != crc_lsb:
        raise Exception(f"Checksum verification failed. Response: {hex_readable(response_bytes)}")
    return response_bytes[-2:]
 
 
def parse_antenna(hex_value: int) -> list[int]:
    antennas: list[int] = []
    for i in range(8):
        if hex_value & (1 << i):
            antennas.append(i + 1)
    return antennas
 

main.py

Silahkan uncomment kode yang dibutuhkan.

from response import hex_readable, ActiveModeTag, ActiveModeHeartbeat
from transport import SerialTransport, TcpTransport
from reader import Reader
 
transport: SerialTransport = SerialTransport('/dev/ttyUSB0', 57600)
# transport: TcpTransport = TcpTransport('192.168.0.250', 27011)
reader: Reader = Reader(transport)
 
# 1. Inventory - Active mode
for response in reader.inventory_active_mode():
    if isinstance(response, ActiveModeTag):
        print(f'Antenna: {response.antennas} > RSSI: {response.rssi} > Tag: {hex_readable(response.tag)}')
    if isinstance(response, ActiveModeHeartbeat):
        print('>> Heartbeat')
 
#########################################################
 
# 2. Inventory - Answer mode
# for _ in range(10): # Loop 10 times
#     for response in reader.inventory_answer_mode(
#             # mask_memory_bank=0x01, # EPC
#             # mask_start_address=0x04 * 8, # 4th byte
#             # mask_data=bytearray([0xE2, 0x00, 0xA8]), # EPC Mask start with: E2 00 A8 ...
#     ):
#         if not response.tags:
#             print("No tag detected")
#         for tag in response.tags:
#             print(f"Antenna: {response.antennas} > RSSI: {tag.rssi} > Tag: {hex_readable(tag.tag)}")
 
#########################################################
 
# 3. Set inventory work mode
# work_mode: int = 0x00 # 0: Answer mode, 1: Active mode, 2: Trigger mode
# response_set_work_mode = reader.set_inventory_work_mode(work_mode)
 
reader.close()

Output

Antenna: [1] > RSSI: 81 > Tag: E2 00 A8 32 FE 55 83 91 CE 26 89 AB
Antenna: [4] > RSSI: 98 > Tag: E2 00 42 51 CC 12 34 56 78 90 AB CD