Komunikasi EL-UHF-RC4 dengan Komputer
Pada artikel ini, kami mencoba berkomunikasi dengan reader EL-UHF-RC4 series menggunakan protokol melalui koneksi Serial (RS232) atau TCP/IP.
Menggunakan koneksi Serial diperlukan kabel RS232 to USB untuk menghubungkan reader ke komputer. Sedangkan untuk koneksi TCP/IP, reader terhubung melalui kabel Ethernet ke komputer/router.
Kode
🐍 Python
Kode python di bawah ini adalah versi sederhana. Contoh kode Python lengkap tersedia dalam program demo (dibuat menggunakan bahasa Python), link download: https://cloud.electron.id/s/wSap3C2PiWXz4Jj
transport.py
import glob
import sys
from enum import Enum
from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
import serial
T = TypeVar('T', bound='Parent')
class BaudRate(Enum):
BPS_9600 = 0x00
BPS_19200 = 0x01
BPS_38400 = 0x02
BPS_57600 = 0x03
BPS_115200 = 0x04
_ignore_ = ["INT"]
INT = []
def __str__(self) -> str:
return f'{self.to_int} bps'
@property
def to_int(self) -> int:
return self.INT[self.value]
@classmethod
def from_int(cls, value: int) -> T:
for baud_rate in BaudRate:
if baud_rate.to_int == value:
return baud_rate
BaudRate.INT = [9600, 19200, 38400, 57600, 115200]
class Transport(ABC):
@abstractmethod
def connect(self, **kwargs) -> None:
raise NotImplementedError
@abstractmethod
def reconnect(self, **kwargs) -> bytes:
raise NotImplementedError
@abstractmethod
def read_bytes(self, **kwargs) -> bytes:
raise NotImplementedError
@abstractmethod
def write_bytes(self, buffer: bytes) -> None:
raise NotImplementedError
@abstractmethod
def clear_buffer(self) -> None:
raise NotImplementedError
@abstractmethod
def close(self) -> None:
raise NotImplementedError
class TcpTransport(Transport):
def __init__(self, ip_address: str, port: int, timeout: int = 6) -> None:
self.ip_address: str = ip_address
self.port: int = port
self.timeout: int = timeout
self.socket: socket | None = None
def __str__(self) -> str:
return f'TcpTransport(ip_address: {self.ip_address}, port: {self.port}, timeout: {self.timeout})'
def connect(self) -> None:
self.socket = socket(AF_INET, SOCK_STREAM)
self.socket.settimeout(self.timeout)
self.socket.connect((self.ip_address, self.port))
def reconnect(self, ip_address: str | None = None, port: int | None = None,
timeout: int | None = None) -> None:
self.socket.close()
self.ip_address = ip_address if ip_address else self.ip_address
self.timeout = timeout if timeout else self.timeout
self.connect()
def read_bytes(self, length: int) -> bytes:
return self.socket.recv(length)
def write_bytes(self, buffer: bytes) -> None:
self.socket.sendall(buffer)
def clear_buffer(self) -> None:
# self.socket.recv(1024)
pass
def close(self) -> None:
self.socket.close()
class SerialTransport(Transport):
def __init__(self, serial_port: str, baud_rate: BaudRate, timeout: int = 0.5) -> None:
self.serial_port = serial_port
self.baud_rate = baud_rate
self.timeout = timeout
self.serial = serial.Serial(self.serial_port, self.baud_rate.to_int,
timeout=timeout, write_timeout=timeout * 2)
def __str__(self) -> str:
return f'SerialTransport(port: {self.serial_port}, baud_rate: {self.baud_rate})'
@classmethod
def scan(cls, timeout: int = 1) -> list[str]:
result: list[str] = []
if sys.platform.startswith("win"): # Windows
ports = ["COM%s" % (i + 1) for i in range(15)]
elif sys.platform.startswith("linux") or sys.platform.startswith("cygwin"): # Linux
# this excludes your current terminal "/dev/tty"
ports = glob.glob("/dev/tty[A-Za-z]*")
elif sys.platform.startswith("darwin"): # Mac OS
ports = glob.glob("/dev/tty.*")
else:
raise EnvironmentError("Unsupported platform")
for port in ports:
try:
s = serial.Serial(port, timeout=timeout)
s.close()
result.append(port)
except serial.SerialException as _:
pass
return result
def connect(self, **kwargs):
pass
def reconnect(self, serial_port: str | None = None, baud_rate: BaudRate | None = None,
timeout: int | None = None) -> None:
self.close()
self.serial_port = serial_port if serial_port else self.serial.port
self.baud_rate = baud_rate if baud_rate else BaudRate.from_int(self.serial.baudrate)
self.timeout = timeout if timeout else self.serial.timeout
self.serial = serial.Serial(self.serial_port, self.baud_rate.to_int,
timeout=self.timeout, write_timeout=self.timeout * 2)
def read_bytes(self, length: int) -> bytes:
response = self.serial.read(length)
return response
def write_bytes(self, buffer: bytes) -> None:
self.serial.write(buffer)
def clear_buffer(self) -> None:
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
def close(self) -> None:
self.serial.close()
command.py
from enum import Enum
from utils import calculate_checksum
HEADER = 0xCF
BROADCAST_ADDRESS = 0xFF
class CommandOption(Enum):
SET = 0x01
GET = 0x02
class CommandRequest(Enum):
SET_POWER = 0x0053
GET_DEVICE_INFO = 0x0070
INVENTORY_ISO_CONTINUE = 0x0001
INVENTORY_STOP = 0x0002
INVENTORY_ACTIVE = 0x0001
class Command(object):
def __init__(self,
command: CommandRequest,
address=BROADCAST_ADDRESS,
data: bytes | bytearray = bytearray()) -> None:
self.address = address
self.command = command
self.data = data
def serialize(self, with_checksum: bool = True) -> bytes:
base_data = bytearray(
[HEADER, self.address]) + \
self.command.value.to_bytes(2, "big") + \
bytearray([len(self.data)]) + \
bytearray(self.data)
if with_checksum:
checksum = calculate_checksum(base_data)
base_data.extend(checksum)
return base_data
response.py
from dataclasses import dataclass
from enum import Enum
from command import HEADER, CommandRequest
from utils import calculate_checksum, hex_readable
class Status(Enum):
SUCCESS = 0x00
WRONG_PARAM = 0x01
CMD_EXECUTION_FAILED = 0x02
RESERVE = 0x03
NO_COUNT_LABEL = 0x12
TIMEOUT = 0x14
TAG_RESPONSE_ERROR = 0x15
AUTHENTICATION_FAILED = 0x16
WRONG_PASSWORD = 0x17
NO_MORE_DATA = 0xFF
class TagStatus(Enum):
NO_ERROR = 0xFF
TIMEOUT = 0x14
OTHER_ERROR = 0x81
STORAGE_AREA_ERROR = 0x82
STORAGE_LOCK = 0x83
INSUFFICIENT_POWER = 0x84
NO_POWER = 0x85
class InventoryStatus(Enum):
SUCCESS = 0x00
WRONG_PARAM = 0x01
CMD_EXECUTION_FAILED = 0x02
NO_COUNT_LABEL = 0x12
EXCEED_MAX_TRANSMIT_SERIAL = 0x17
@dataclass
class Tag:
rssi: bytes
antenna: int
channel: int
data: bytes
count: int = 1
def __str__(self) -> str:
return f'Tag(rssi: {hex_readable(self.rssi)}, ' \
f'antenna: {self.antenna}, channel: {self.channel}, ' \
f'data: {hex_readable(self.data)})'
class Response:
def __init__(self, response: bytes) -> None:
if response is None:
raise ValueError("Response is None")
header_section: bytes = response[0:5]
assert header_section[0] == HEADER
self.header: int = response[0]
self.address: int = response[1]
self.command: CommandRequest = CommandRequest(int.from_bytes(response[2:4], "big"))
self.length: int = response[4]
self.status: Status = Status(response[5])
__body_n_checksum_section: bytes = response[6: 4 + self.length + 2 + 1]
self.payload: bytes = __body_n_checksum_section[0:-2]
self.checksum: bytes = __body_n_checksum_section[-2:]
# Verify checksum
data = bytearray(header_section)
data.extend(bytearray([self.status.value]))
if self.payload:
data.extend(self.payload)
crc_msb, crc_lsb = calculate_checksum(data)
assert self.checksum[0] == crc_msb and self.checksum[1] == crc_lsb
reader.py
from enum import Enum
from typing import Iterator
from command import CommandRequest, Command
from response import Response, InventoryStatus, Tag
from transport import Transport
from utils import hex_readable
class StopType(Enum):
"""\
TIME: int = According to the time (in seconds)
NUMBER: int = According to the number or cycles
"""
TIME = 0x00
NUMBER = 0x01
class Reader:
def __init__(self, transport: Transport) -> None:
super().__init__()
self.transport = transport
self.transport.connect()
def close(self) -> None:
self.transport.close()
def __receive_response(self) -> bytes | None:
# Get header section
response_header_section: bytes = self.transport.read_bytes(length=5)
if not response_header_section:
return
assert len(response_header_section) == 5
# Get body section
body_length: int = response_header_section[-1]
response_body_section: bytes = self.transport.read_bytes(length=body_length + 2) # 2(checksum)
return response_header_section + response_body_section
def set_power(self, power: int) -> Response:
cmd_request: CommandRequest = CommandRequest.SET_POWER
command: Command = Command(cmd_request, data=bytearray([power, 0x00]))
# Send request
self.transport.write_bytes(command.serialize())
# Receive response
raw_response: bytes | None = self.__receive_response()
if raw_response is None:
raise RuntimeError("No response from reader.")
response: Response = Response(raw_response)
# Validation response
assert response.command == cmd_request
return response
def start_inventory_answer_mode(self, stop_type: StopType, value: int) -> Iterator[Tag]:
cmd_request: CommandRequest = CommandRequest.INVENTORY_ISO_CONTINUE
data = bytearray([stop_type.value])
data.extend(value.to_bytes(4, "big"))
command = Command(cmd_request, data=data)
# Send request
self.transport.write_bytes(command.serialize())
while True:
raw_response: bytes | None = self.__receive_response()
if raw_response is None:
return
response: Response = Response(raw_response)
if response.command == CommandRequest.INVENTORY_STOP or not response.payload:
print("Inventory finished!")
break
tag: Tag = Tag(
rssi=response.payload[0:2],
antenna=response.payload[2],
channel=response.payload[3],
data=response.payload[5:])
yield tag
def stop_inventory_answer_mode(self) -> Response:
cmd_request: CommandRequest = CommandRequest.INVENTORY_STOP
command: Command = Command(cmd_request)
# Send request
self.transport.write_bytes(command.serialize())
# Receive response
response: Response = Response(self.__receive_response())
# Validation response
assert response.command == cmd_request
return response
utils.py
from array import array
def hex_readable(data_bytes: bytes | array, separator: str = " ") -> str:
return separator.join("{:02X}".format(x) for x in data_bytes)
def ip_bytes(ip_str: str) -> bytearray:
ip_str_split = ip_str.split('.')
assert len(ip_str_split) == 4
return bytearray([int(ip) for ip in ip_str_split])
def calculate_checksum(data: bytes) -> bytearray:
value = 0xFFFF
for d in data:
value ^= d
for _ in range(8):
value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
crc_msb = value >> 0x08
crc_lsb = value & 0xFF
return bytearray([crc_msb, crc_lsb])
def calculate_rssi(rssi: bytes) -> int:
return int.from_bytes(rssi, "big", signed=True)
main.py
from typing import Iterator
from response import Tag
from transport import Transport, SerialTransport, BaudRate, TcpTransport
from reader import Reader, StopType
transport: Transport = SerialTransport('/dev/ttyUSB0', BaudRate.BPS_115200)
# transport: Transport = TcpTransport('192.168.1.250', 2022)
reader: Reader = Reader(transport)
# 1. Inventory answer mode
tags: Iterator[Tag] = reader.start_inventory_answer_mode(stop_type=StopType.TIME, value=3) # Stop after 3 seconds
for tag in tags:
print(tag)
# 2. Set power
# reader.set_power(10)
reader.close()