Komunikasi EL-UHF-RF014 menggunakan Bahasa Pemrograman Python
Tutorial ini menjelaskan cara berkomunikasi dengan reader EL-UHF-RF014 melalui koneksi Serial (RS232) atau TCP/IP tanpa menggunakan SDK.
Komunikasi dengan reader dilakukan langsung menggunakan protokol bawaan, yaitu dengan mengirim dan menerima data dalam bentuk bytes, lalu melakukan parsing untuk memproses respons yang diterima.
Untuk koneksi Serial, diperlukan kabel RS232 to USB agar reader dapat terhubung ke komputer. Sementara itu, koneksi TCP/IP menggunakan kabel Ethernet untuk menghubungkan reader ke komputer atau jaringan.
Berikut adalah diagram koneksi ke komputer:
🐍 Python
Kode python di bawah ini adalah versi sederhana hanya untuk inventory (answer / active mode). Anda dapat mengembangkan sendiri sesuai yang dibutuhkan berdasarkan dokumentasi protokol.
transport.py
from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
import serial
T = TypeVar('T', bound='Parent')
class Transport(ABC):
@abstractmethod
def read_bytes(self, length: int) -> bytes:
raise NotImplementedError
@abstractmethod
def write_bytes(self, buffer: bytes) -> None:
raise NotImplementedError
def read_frame(self) -> bytes | None:
length_bytes = self.read_bytes(1)
if not length_bytes:
return
frame_length = ord(chr(length_bytes[0]))
data = length_bytes + self.read_bytes(frame_length)
return bytearray(data)
@abstractmethod
def close(self) -> None:
raise NotImplementedError
class TcpTransport(Transport):
def __init__(self, ip_address: str, port: int, timeout: int = 1) -> None:
self.socket = socket(AF_INET, SOCK_STREAM)
self.socket.settimeout(timeout)
self.socket.connect((ip_address, port))
def read_bytes(self, length: int) -> bytes:
return self.socket.recv(length)
def write_bytes(self, buffer: bytes) -> None:
self.socket.sendall(buffer)
def close(self) -> None:
self.socket.close()
class SerialTransport(Transport):
def __init__(self, serial_port: str, baud_rate: int, timeout: int = 1) -> None:
self.serial = serial.Serial(serial_port, baud_rate,
timeout=timeout, write_timeout=timeout)
def read_bytes(self, length: int) -> bytes:
return self.serial.read(length)
def write_bytes(self, buffer: bytes) -> None:
self.serial.write(buffer)
def close(self) -> None:
self.serial.close()
command.py
from utils import calculate_checksum
CMD_INVENTORY_ANSWER_MODE: int = 0x01
CMD_INVENTORY_ACTIVE_MODE: int = 0xEE
CMD_SET_WORK_MODE: int = 0x76
CMD_GET_WORK_MODE: int = 0x77
class Command:
def __init__(self, command: int, reader_address: int = 0xFF,
data: bytes | int | None = None):
self.command = command
self.reader_address = reader_address
self.data = data
if isinstance(data, int):
self.data = bytearray([data])
if data is None:
self.data = bytearray()
self.frame_length = 4 + len(self.data)
self.base_data = bytearray([self.frame_length, self.reader_address, self.command])
self.base_data.extend(self.data)
def serialize(self) -> bytes:
serialize = self.base_data
checksum = calculate_checksum(serialize)
serialize.extend(checksum)
return serialize
response.py
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
from utils import verify_checksum, hex_readable
@dataclass
class Response:
response_bytes: bytes
length: int = field(init=False)
reader_address: int = field(init=False)
command: int = field(init=False)
status: int = field(init=False)
data: bytes = field(init=False)
checksum: bytes = field(init=False)
def __post_init__(self):
if len(self.response_bytes) < 6:
raise ValueError(f"Response data is too short to be valid. Response: {hex_readable(self.response_bytes)}")
self.length = self.response_bytes[0]
if len(self.response_bytes) < self.length:
raise ValueError("Response length mismatch.")
self.reader_address = self.response_bytes[1]
self.command = self.response_bytes[2]
self.status = self.response_bytes[3]
self.data = self.response_bytes[4 : self.length - 1]
self.checksum = verify_checksum(self.response_bytes)
if self.status == 0xFD:
raise ValueError("Incorrect command frame length.")
if self.status == 0xFF:
raise ValueError("Unrecognised parameter.")
def __str__(self) -> str:
parts = [
">>> START RESPONSE ================================"
f"\nRESPONSE >> {hex_readable(self.response_bytes)}",
f"\nREADER ADDRESS >> {hex_readable(self.reader_address)}",
f"\nCOMMAND >> {hex_readable(self.command)}",
f"\nSTATUS >> {hex_readable(self.status)}",
]
if self.data:
parts.append(f"\nDATA >> {hex_readable(self.data)}")
parts.append(f"\nCHECKSUM >> {hex_readable(self.checksum)}")
parts.append("\n>>> END RESPONSE ================================")
return "".join(parts).strip()
@dataclass
class WorkMode:
read_mode: int # 1 byte
tag_protocol: int # 1 byte
read_pause_time: int # 1 byte
filter_time: int # 1 byte
q_value: int # 1 byte (bitwise handling required)
session: int # 1 byte
mask_memory_bank: int # 1 byte
mask_start_address: int # 2 bytes (big-endian)
mask_length: int # 1 byte
mask_data: bytes # 32 bytes
address_tid: int # 1 byte
length_tid: int # 1 byte
@property
def strategy_indicator(self) -> int:
return (self.q_value >> 6) & 0b1
@property
def original_q_value(self) -> int:
return self.q_value & 0b111111 # Bits 5-0
@classmethod
def from_bytes(cls, data: bytes):
if len(data) != 44:
raise ValueError(f"Invalid data length ({len(data)}), expected 44 bytes")
return cls(
read_mode=data[0],
tag_protocol=data[1],
read_pause_time=data[2],
filter_time=data[3],
q_value=data[4],
session=data[5],
mask_memory_bank=data[6],
mask_start_address=int.from_bytes(data[7:9], 'big'),
mask_length=data[9],
mask_data=data[10:42],
address_tid=data[40],
length_tid=data[41]
)
@dataclass
class ActiveModeTag(Response):
antennas: Optional[List[int]] = field(init=False, default=None)
tag: Optional[bytes] = field(init=False, default=None)
rssi: Optional[int] = field(init=False, default=None)
def __post_init__(self) -> None:
super().__post_init__()
assert self.status == 0x00, "Invalid status for success active mode response."
self.antennas = [i + 1 for i in range(8) if (self.data[0] >> i) & 1]
tag_length = self.data[1] # Tag length
self.tag = self.data[2 : tag_length + 2] # EPC/TID
self.rssi = self.data[tag_length + 2] # RSSI
class AntennaStatus(Enum):
IDLE: int = 0x00
WORKING: int = 0x01
DISCONNECTED: int = 0x02
UNKNOWN: int = 0xFF
@classmethod
def from_byte(cls, byte: int) -> "AntennaStatus":
return cls(byte) if byte in cls._value2member_map_ else cls.UNKNOWN
@dataclass
class ActiveModeHeartbeatStatus:
packet_no: int
antenna_status: AntennaStatus
total_count: int
@dataclass
class ActiveModeHeartbeat(Response):
antenna_1: Optional[ActiveModeHeartbeatStatus] = field(init=False, default=None)
antenna_2: Optional[ActiveModeHeartbeatStatus] = field(init=False, default=None)
antenna_3: Optional[ActiveModeHeartbeatStatus] = field(init=False, default=None)
antenna_4: Optional[ActiveModeHeartbeatStatus] = field(init=False, default=None)
def __post_init__(self) -> None:
super().__post_init__()
assert self.status == 0x28, "Invalid status for heartbeat active mode response."
assert len(self.data) == 12, "Invalid data length for heartbeat packet."
packet_no: bytes = self.data[:4]
antenna_status: bytes = self.data[4:8]
total_count: bytes = self.data[8:12]
self.antenna_1 = ActiveModeHeartbeatStatus(
packet_no=packet_no[0],
antenna_status=AntennaStatus.from_byte(antenna_status[0]),
total_count=total_count[0],
)
self.antenna_2 = ActiveModeHeartbeatStatus(
packet_no=packet_no[1],
antenna_status=AntennaStatus.from_byte(antenna_status[1]),
total_count=total_count[1],
)
self.antenna_3 = ActiveModeHeartbeatStatus(
packet_no=packet_no[2],
antenna_status=AntennaStatus.from_byte(antenna_status[2]),
total_count=total_count[2],
)
self.antenna_4 = ActiveModeHeartbeatStatus(
packet_no=packet_no[3],
antenna_status=AntennaStatus.from_byte(antenna_status[3]),
total_count=total_count[3],
)
@dataclass
class ResponseAnswerModeStatistic(Response):
antennas: Optional[List[int]] = field(init=False, default=None)
read_rate: Optional[int] = field(init=False, default=None)
total_count: Optional[int] = field(init=False, default=None)
def __post_init__(self) -> None:
super().__post_init__()
assert self.status == 0x26, "Invalid status for statistic answer mode response."
self.antennas = [i + 1 for i in range(8) if (self.data[0] >> i) & 1]
self.read_rate = int.from_bytes(self.data[1:3], "big")
self.total_count = int.from_bytes(self.data[3:7], "big")
@dataclass
class AnswerModeTag:
fast_id_enabled: Optional[bool]
tag: Optional[bytes]
rssi: Optional[int]
@dataclass
class ResponseAnswerMode(Response):
antennas: Optional[List[int]] = field(init=False, default=None)
num: Optional[int] = field(init=False, default=None)
tags: Optional[list[AnswerModeTag]] = field(init=False, default=None)
def __post_init__(self) -> None:
super().__post_init__()
if self.status == 0xF8:
raise Exception("Antenna error detected, the current antenna might be disconnected.")
elif self.status == 0x04:
raise Exception("Reader run out of memory space due to the amount of tags.")
self.antennas = [i + 1 for i in range(8) if (self.data[0] >> i) & 1]
self.num = self.data[1]
self.tags = []
data = self.data[2:]
index = 0
while index < self.num:
# Read EPC header (1 byte)
epc_header = data[index]
fast_id_enabled = (epc_header & 0x80) >> 7 # bit7
epc_length = epc_header & 0x7F # bit6 ~ bit0
index += 1
# Read EPC data (N bytes)
tag = data[index:index + epc_length]
index += epc_length
# Read RSSI (1 byte)
rssi = data[index]
index += 1
self.tags.append(
AnswerModeTag(
fast_id_enabled=bool(fast_id_enabled),
tag=tag,
rssi=rssi
)
)
reader.py
from typing import Iterator
from transport import Transport
from command import *
from response import *
class Reader:
def __init__(self, transport: Transport) -> None:
self.transport = transport
def close(self) -> None:
self.transport.close()
def _send_request(self, command: Command) -> None:
self.transport.write_bytes(command.serialize())
def _get_response(self) -> bytes:
return self.transport.read_frame()
def set_inventory_work_mode(self, work_mode: int) -> Response:
"""
8.4.23 Modify reader working mode
Args:
work_mode: 0: Answer mode, 1: Active mode, 2: Trigger mode
Returns:
Response
"""
if work_mode not in (0x00, 0x01, 0x02):
raise ValueError("Work mode must be 0x00, 0x01, or 0x02")
self._send_request(Command(CMD_SET_WORK_MODE, data=work_mode))
raw_response: bytes = self._get_response()
return Response(raw_response)
def work_mode(self) -> WorkMode:
"""
8.4.24 Obtain reader working mode
Returns:
WorkMode
"""
self._send_request(Command(CMD_GET_WORK_MODE))
response: Response = Response(self._get_response())
return WorkMode.from_bytes(response.data)
def inventory_active_mode(self) -> Iterator[ActiveModeTag | ActiveModeHeartbeat]:
"""
Real-time inventory mode
:return:
"""
while True:
try:
raw_response: bytes | None = self._get_response()
except TimeoutError:
print("TimeoutError occurred, continue inventory.")
continue
if raw_response is None:
continue
response: Response = Response(raw_response)
assert response.command == CMD_INVENTORY_ACTIVE_MODE, (
f"Unexpected response command: expected 0x{hex_readable(CMD_INVENTORY_ACTIVE_MODE)}, "
f"but got 0x{hex_readable(response.command)}"
)
if response.status not in [
0x00, # Detected appropriate tag
0x28, # No appropriate tag detected within the heartbeat packet time interval
]:
continue
if response.status == 0x00:
yield ActiveModeTag(raw_response)
elif response.status == 0x28:
yield ActiveModeHeartbeat(raw_response)
def inventory_answer_mode(
self,
statistic_data_flag: bool = False,
strategy_indicator: bool = False,
fastid_inventory: bool = False,
q_value: int = 4,
session: int = 0,
mask_memory_bank: int = 1,
mask_start_address: int = 0,
mask_data: bytes = bytes(),
tid_start_address: Optional[int] = None,
tid_length: Optional[int] = None,
target: Optional[int] = None,
antenna: Optional[int] = None,
scan_time: Optional[int] = None,
) -> Iterator[ResponseAnswerMode]:
"""
8.2.1 Tags inventory
Args:
statistic_data_flag (bool): Statistic data packet flag.
strategy_indicator (bool): Strategy indicator (0=general, 1=special).
fastid_inventory (bool): Impinj FastID function.
q_value (int): The Q-value (0-15).
session (int): Session value (0x00, 0x01, 0x02, 0x03, or 0xff).
mask_memory_bank (int): Mask memory (0x01=EPC, 0x02=TID, 0x03=User).
mask_start_address (int): Mask entry bit address (0-16383).
mask_length (int): Bit length of mask.
mask_data (bytes): Mask data.
tid_start_address (int, optional): Entry address of inventory TID memory.
tid_length (int, optional): Data length for TID inventory (0-15).
target (int, optional): Target value (0x00=A, 0x01=B).
antenna (int, optional): Antenna selection (0x80-0x83).
scan_time (int, optional): Inventory scan time (0-255, unit: 100ms).
Returns:
Iterator[ResponseAnswerMode]: Generator yielding inventory responses.
"""
# Validations with detailed messages
def validate_range(name, value, min_val, max_val):
if not (min_val <= value <= max_val):
raise ValueError(f"{name} must be in range {min_val}-{max_val}, but got {value}")
validate_range("QValue", q_value, 0, 15)
if session not in (0x00, 0x01, 0x02, 0x03, 0xFF):
raise ValueError(f"Session must be one of [0x00, 0x01, 0x02, 0x03, 0xFF], but got {session}")
if mask_memory_bank not in (0x01, 0x02, 0x03):
raise ValueError(f"MaskMem must be 0x01 (EPC), 0x02 (TID), or 0x03 (User), but got {mask_memory_bank}")
validate_range("MaskAdr", mask_start_address, 0, 16383)
if tid_start_address is not None or tid_length is not None:
validate_range("AdrTID", tid_start_address, 0, 65535)
validate_range("LenTID", tid_length, 0, 15)
if target is not None \
or antenna is not None \
or scan_time is not None:
raise ValueError("Target, Antenna, and ScanTime if one provided, must all be provided.")
if target is not None and target not in (0x00, 0x01):
raise ValueError(f"Target must be 0x00 (A) or 0x01 (B), but got {target}")
if antenna is not None and antenna not in (0x80, 0x81, 0x82, 0x83):
raise ValueError(f"Antenna must be 0x80, 0x81, 0x82, or 0x83, but got {antenna}")
if scan_time is not None:
validate_range("ScanTime", scan_time, 0, 255)
# Construct QValue byte
qvalue_byte = (
(statistic_data_flag << 7) |
(strategy_indicator << 6) |
(fastid_inventory << 5) |
(q_value & 0x1F)
)
# Required parameters
params: bytearray = bytearray()
params.extend([qvalue_byte])
params.extend([session])
params.extend([mask_memory_bank])
params.extend(mask_start_address.to_bytes(2, byteorder="big"))
# Append MasKLen
mask_length = len(mask_data) * 8
params.extend([mask_length])
# Append MaskData
mask_data_len = (mask_length // 8) + (1 if mask_length % 8 != 0 else 0)
mask_data = mask_data.ljust(mask_data_len, b'\x00')
params.extend(mask_data)
# AdrTID & LenTID
if mask_memory_bank == 0x02:
if tid_start_address is not None or tid_length is not None:
params.extend([tid_start_address, tid_length])
# Optional parameters (only included if provided)
if target is not None \
or antenna is not None \
or scan_time is not None:
params.append(target)
params.append(antenna)
params.append(scan_time)
# Send command
command = Command(CMD_INVENTORY_ANSWER_MODE, data=params)
self._send_request(command)
# Get responses
while True:
response = ResponseAnswerMode(self._get_response())
assert response.command == CMD_INVENTORY_ANSWER_MODE, (
f"Unexpected response command: expected {hex(CMD_INVENTORY_ANSWER_MODE)}, "
f"but got {hex(response.command)}"
)
if response.status == 0x03:
yield response
continue
elif response.status == 0x02:
raise Exception("Inventory timeout, operation aborted.")
elif response.status == 0x01:
break # Operation completed
utils.py
def calculate_checksum(data: bytes) -> bytearray:
value = 0xFFFF
for d in data:
value ^= d
for _ in range(8):
value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
crc_msb = value >> 0x08
crc_lsb = value & 0xFF
return bytearray([crc_lsb, crc_msb])
def hex_readable(data: bytes | int, bytes_separator: str = " ") -> str:
if isinstance(data, int):
return "{:02X}".format(data)
return bytes_separator.join("{:02X}".format(x) for x in data)
def verify_checksum(response_bytes: bytes) -> bytes:
data = bytearray(response_bytes[:-2])
crc_msb, crc_lsb = calculate_checksum(data)
if response_bytes[-2] != crc_msb or response_bytes[-1] != crc_lsb:
raise Exception(f"Checksum verification failed. Response: {hex_readable(response_bytes)}")
return response_bytes[-2:]
def parse_antenna(hex_value: int) -> list[int]:
antennas: list[int] = []
for i in range(8):
if hex_value & (1 << i):
antennas.append(i + 1)
return antennas
main.py
Silahkan uncomment kode yang dibutuhkan.
from response import hex_readable, ActiveModeTag, ActiveModeHeartbeat
from transport import SerialTransport, TcpTransport
from reader import Reader
transport: SerialTransport = SerialTransport('/dev/ttyUSB0', 57600)
# transport: TcpTransport = TcpTransport('192.168.0.250', 27011)
reader: Reader = Reader(transport)
# 1. Inventory - Active mode
for response in reader.inventory_active_mode():
if isinstance(response, ActiveModeTag):
print(f'Antenna: {response.antennas} > RSSI: {response.rssi} > Tag: {hex_readable(response.tag)}')
if isinstance(response, ActiveModeHeartbeat):
print('>> Heartbeat')
#########################################################
# 2. Inventory - Answer mode
# for _ in range(10): # Loop 10 times
# for response in reader.inventory_answer_mode(
# # mask_memory_bank=0x01, # EPC
# # mask_start_address=0x04 * 8, # 4th byte
# # mask_data=bytearray([0xE2, 0x00, 0xA8]), # EPC Mask start with: E2 00 A8 ...
# ):
# if not response.tags:
# print("No tag detected")
# for tag in response.tags:
# print(f"Antenna: {response.antennas} > RSSI: {tag.rssi} > Tag: {hex_readable(tag.tag)}")
#########################################################
# 3. Set inventory work mode
# work_mode: int = 0x00 # 0: Answer mode, 1: Active mode, 2: Trigger mode
# response_set_work_mode = reader.set_inventory_work_mode(work_mode)
reader.close()
Output
Antenna: [1] > RSSI: 81 > Tag: E2 00 A8 32 FE 55 83 91 CE 26 89 AB
Antenna: [4] > RSSI: 98 > Tag: E2 00 42 51 CC 12 34 56 78 90 AB CD