Komunikasi EL-UHF-RC4 dengan Komputer
Pada artikel ini, kami mencoba berkomunikasi dengan reader EL-UHF-RC4 series menggunakan protokol melalui koneksi Serial (RS232) atau TCP/IP.
Menggunakan koneksi Serial diperlukan kabel RS232 to USB untuk menghubungkan reader ke komputer. Sedangkan untuk koneksi TCP/IP, reader terhubung melalui kabel Ethernet ke komputer/router.
Kode
🐍 Python
Kode python di bawah ini adalah versi sederhana. Contoh kode Python lengkap tersedia dalam program demo (dibuat menggunakan bahasa Python), link download: https://cloud.electron.id/s/wSap3C2PiWXz4Jj
transport.py
import glob
import sys
from enum import Enum
from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
import serial
T = TypeVar('T', bound='Parent')
class BaudRate(Enum):
BPS_9600 = 0x00
BPS_19200 = 0x01
BPS_38400 = 0x02
BPS_57600 = 0x03
BPS_115200 = 0x04
_ignore_ = ["INT"]
INT = []
def __str__(self) -> str:
return f'{self.to_int} bps'
@property
def to_int(self) -> int:
return self.INT[self.value]
@classmethod
def from_int(cls, value: int) -> T:
for baud_rate in BaudRate:
if baud_rate.to_int == value:
return baud_rate
BaudRate.INT = [9600, 19200, 38400, 57600, 115200]
class Transport(ABC):
@abstractmethod
def connect(self, **kwargs) -> None:
raise NotImplementedError
@abstractmethod
def reconnect(self, **kwargs) -> bytes:
raise NotImplementedError
@abstractmethod
def read_bytes(self, **kwargs) -> bytes:
raise NotImplementedError
@abstractmethod
def write_bytes(self, buffer: bytes) -> None:
raise NotImplementedError
@abstractmethod
def clear_buffer(self) -> None:
raise NotImplementedError
@abstractmethod
def close(self) -> None:
raise NotImplementedError
class TcpTransport(Transport):
def __init__(self, ip_address: str, port: int, timeout: int = 6) -> None:
self.ip_address: str = ip_address
self.port: int = port
self.timeout: int = timeout
self.socket: socket | None = None
def __str__(self) -> str:
return f'TcpTransport(ip_address: {self.ip_address}, port: {self.port}, timeout: {self.timeout})'
def connect(self) -> None:
self.socket = socket(AF_INET, SOCK_STREAM)
self.socket.settimeout(self.timeout)
self.socket.connect((self.ip_address, self.port))
def reconnect(self, ip_address: str | None = None, port: int | None = None,
timeout: int | None = None) -> None:
self.socket.close()
self.ip_address = ip_address if ip_address else self.ip_address
self.timeout = timeout if timeout else self.timeout
self.connect()
def read_bytes(self, length: int) -> bytes:
return self.socket.recv(length)
def write_bytes(self, buffer: bytes) -> None:
self.socket.sendall(buffer)
def clear_buffer(self) -> None:
# self.socket.recv(1024)
pass
def close(self) -> None:
self.socket.close()
class SerialTransport(Transport):
def __init__(self, serial_port: str, baud_rate: BaudRate, timeout: int = 0.5) -> None:
self.serial_port = serial_port
self.baud_rate = baud_rate
self.timeout = timeout
self.serial = serial.Serial(self.serial_port, self.baud_rate.to_int,
timeout=timeout, write_timeout=timeout * 2)
def __str__(self) -> str:
return f'SerialTransport(port: {self.serial_port}, baud_rate: {self.baud_rate})'
@classmethod
def scan(cls, timeout: int = 1) -> list[str]:
result: list[str] = []
if sys.platform.startswith("win"): # Windows
ports = ["COM%s" % (i + 1) for i in range(15)]
elif sys.platform.startswith("linux") or sys.platform.startswith("cygwin"): # Linux
# this excludes your current terminal "/dev/tty"
ports = glob.glob("/dev/tty[A-Za-z]*")
elif sys.platform.startswith("darwin"): # Mac OS
ports = glob.glob("/dev/tty.*")
else:
raise EnvironmentError("Unsupported platform")
for port in ports:
try:
s = serial.Serial(port, timeout=timeout)
s.close()
result.append(port)
except serial.SerialException as _:
pass
return result
def connect(self, **kwargs):
pass
def reconnect(self, serial_port: str | None = None, baud_rate: BaudRate | None = None,
timeout: int | None = None) -> None:
self.close()
self.serial_port = serial_port if serial_port else self.serial.port
self.baud_rate = baud_rate if baud_rate else BaudRate.from_int(self.serial.baudrate)
self.timeout = timeout if timeout else self.serial.timeout
self.serial = serial.Serial(self.serial_port, self.baud_rate.to_int,
timeout=self.timeout, write_timeout=self.timeout * 2)
def read_bytes(self, length: int) -> bytes:
response = self.serial.read(length)
return response
def write_bytes(self, buffer: bytes) -> None:
self.serial.write(buffer)
def clear_buffer(self) -> None:
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
def close(self) -> None:
self.serial.close()
command.py
from enum import Enum
from utils import calculate_checksum
HEADER = 0xCF
BROADCAST_ADDRESS = 0xFF
class CommandOption(Enum):
SET = 0x01
GET = 0x02
class CommandRequest(Enum):
SET_ALL_PARAM = 0x0071
GET_ALL_PARAM = 0x0072
INVENTORY_ISO_CONTINUE = 0x0001
INVENTORY_STOP = 0x0002
INVENTORY_ACTIVE = 0x0001
class Command(object):
def __init__(self,
command: CommandRequest,
address=BROADCAST_ADDRESS,
data: bytes | bytearray = bytearray()) -> None:
self.address = address
self.command = command
self.data = data
def serialize(self, with_checksum: bool = True) -> bytes:
base_data = bytearray(
[HEADER, self.address]) + \
self.command.value.to_bytes(2, "big") + \
bytearray([len(self.data)]) + \
bytearray(self.data)
if with_checksum:
checksum = calculate_checksum(base_data)
base_data.extend(checksum)
return base_data
response.py
from dataclasses import dataclass
from enum import Enum
from typing import TypeVar, Type
from command import HEADER, CommandRequest
from utils import calculate_checksum, hex_readable, calculate_rssi
T = TypeVar('T', bound='Parent')
class Status(Enum):
SUCCESS = 0x00
WRONG_PARAM = 0x01
CMD_EXECUTION_FAILED = 0x02
RESERVE = 0x03
NO_COUNT_LABEL = 0x12
TIMEOUT = 0x14
TAG_RESPONSE_ERROR = 0x15
AUTHENTICATION_FAILED = 0x16
WRONG_PASSWORD = 0x17
NO_MORE_DATA = 0xFF
class TagStatus(Enum):
NO_ERROR = 0xFF
TIMEOUT = 0x14
OTHER_ERROR = 0x81
STORAGE_AREA_ERROR = 0x82
STORAGE_LOCK = 0x83
INSUFFICIENT_POWER = 0x84
NO_POWER = 0x85
class InventoryStatus(Enum):
SUCCESS = 0x00
WRONG_PARAM = 0x01
CMD_EXECUTION_FAILED = 0x02
NO_COUNT_LABEL = 0x12
EXCEED_MAX_TRANSMIT_SERIAL = 0x17
@dataclass
class Tag:
rssi: bytes
antenna: int
channel: int
data: bytes
count: int = 1
def __str__(self) -> str:
return f'Tag(rssi: {str(calculate_rssi(self.rssi))[0:3]}, ' \
f'antenna: {self.antenna}, channel: {self.channel}, ' \
f'data: {hex_readable(self.data)})'
class Response:
def __init__(self, response: bytes) -> None:
if response is None:
raise ValueError("Response is None")
header_section: bytes = response[0:5]
assert header_section[0] == HEADER
self.header: int = response[0]
self.address: int = response[1]
self.command: CommandRequest = CommandRequest(int.from_bytes(response[2:4], "big"))
self.length: int = response[4]
self.status: Status = Status(response[5])
__body_n_checksum_section: bytes = response[6: 4 + self.length + 2 + 1]
self.payload: bytes = __body_n_checksum_section[0:-2]
self.checksum: bytes = __body_n_checksum_section[-2:]
# Verify checksum
data = bytearray(header_section)
data.extend(bytearray([self.status.value]))
if self.payload:
data.extend(self.payload)
crc_msb, crc_lsb = calculate_checksum(data)
assert self.checksum[0] == crc_msb and self.checksum[1] == crc_lsb
def __str__(self) -> str:
response = [
"<<< START RESPONSE ================================",
f"COMMAND >> {self.command}",
f"STATUS >> {self.status}",
]
if self.payload:
response.append(f"PAYLOAD >> {hex_readable(self.payload)}")
response.append("<<< END RESPONSE ================================")
return "\n".join(response).strip().upper()
class RfidProtocol(Enum):
"""Only for ISO 18000-6C"""
ISO_18000_6C = 0x00
GBT_29768 = 0x01
GJB_7377_1 = 0x02
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return RfidProtocol.DISPLAY_STRINGS[self.value]
RfidProtocol.DISPLAY_STRINGS = [
"ISO 18000-6C",
"GB/T 29768",
"GJB 7377.1"
]
class BaudRate(Enum):
BPS_9600 = 0x00
BPS_19200 = 0x01
BPS_38400 = 0x02
BPS_57600 = 0x03
BPS_115200 = 0x04
_ignore_ = ["INT"]
INT = []
def __str__(self) -> str:
return f'{self.to_int} bps'
@property
def to_int(self) -> int:
return self.INT[self.value]
@classmethod
def from_int(cls, value: int) -> T:
for baud_rate in BaudRate:
if baud_rate.to_int == value:
return baud_rate
BaudRate.INT = [9600, 19200, 38400, 57600, 115200]
class Relay(Enum):
OPEN = 0x01
CLOSE = 0x02
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return Relay.DISPLAY_STRINGS[self.value - 1]
def to_index(self) -> int:
return self.value - 1
Relay.DISPLAY_STRINGS = ["Open", "Close"]
class WorkMode(Enum):
ANSWER_MODE = 0x00
ACTIVE_MODE = 0x01
TRIGGER_MODE = 0x02
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return WorkMode.DISPLAY_STRINGS[self.value]
WorkMode.DISPLAY_STRINGS = ["Answer Mode", "Active Mode", "Trigger Mode"]
class OutputInterface(Enum):
WIEGAND = 0x99
RS232 = 0x80
RS485 = 0x40
RJ45 = 0x20
# WiFi = 0x10
USB = 0x01
KEYBOARD = 0x02
# CDC_COM = 0x04
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
@property
def index(self) -> int:
for index, value in enumerate(OutputInterface):
if self == value:
return index
def __str__(self) -> str:
return OutputInterface.DISPLAY_STRINGS[self.index]
@classmethod
def from_index(cls: Type[T], index: int) -> T:
for i, value in enumerate(OutputInterface):
if index == i:
return value
OutputInterface.DISPLAY_STRINGS = [
"Wiegand",
"RS232",
"RS485",
"RJ45",
"USB",
"Keyboard",
# "CDC_COM"
]
class Session(Enum):
SESSION_0 = 0x00
SESSION_1 = 0x01
SESSION_2 = 0x02
SESSION_3 = 0x03
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return Session.DISPLAY_STRINGS[self.value]
Session.DISPLAY_STRINGS = [
"S0",
"S1",
"S2",
"S3"
]
class MemoryBank(Enum):
PASSWORD = 0x00
EPC = 0x01
TID = 0x02
USER = 0x03
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return MemoryBank.DISPLAY_STRINGS[self.value]
MemoryBank.DISPLAY_STRINGS = [
"Password",
"EPC",
"TID",
"User"
]
class LockMemoryBank(Enum):
KILL_PASSWORD = 0x00
ACCESS_PASSWORD = 0x01
EPC = 0x02
# TID = 0x03 # Unlock/lock response is None
USER = 0x04
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def to_index(self) -> int:
for index, lock_memory_bank in enumerate(LockMemoryBank):
if lock_memory_bank.value == self.value:
return index
@classmethod
def from_index(cls: Type[T], index: int) -> T:
for i, value in enumerate(LockMemoryBank):
if index == i:
return value
def __str__(self) -> str:
return LockMemoryBank.DISPLAY_STRINGS[self.to_index()]
LockMemoryBank.DISPLAY_STRINGS = [
"Access Password",
"Kill Password",
"EPC",
"User"
]
class LockAction(Enum):
UNLOCK = 0x00
UNLOCK_PERMANENT = 0x01
LOCK = 0x02
LOCK_PERMANENT = 0x03
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return LockAction.DISPLAY_STRINGS[self.value]
LockAction.DISPLAY_STRINGS = [
"Unlock",
"Unlock (Permanent)",
"Lock",
"Lock (Permanent)",
]
class Region:
def __init__(self, name: str, value: int,
start_frequency: float, end_frequency: float, default_channel_number: int) -> None:
self.name = name
self.value = value
self.start_frequency = start_frequency
self.end_frequency = end_frequency
self.default_channel_number = default_channel_number
self.step = round((self.end_frequency - self.start_frequency) / (self.default_channel_number - 1), 2)
def __str__(self) -> str:
return self.name
@property
def index(self) -> int:
for index, region in enumerate(REGIONS):
if self.value == region.value:
return index
@property
def values(self) -> list[float]:
return [round(self.start_frequency + i * self.step, 3) for i in range(self.default_channel_number)]
@classmethod
def from_value(cls: Type[T], value: int) -> T:
for region in REGIONS:
if region.value != value:
continue
return region
@classmethod
def from_name(cls: Type[T], name: str) -> T:
for region in REGIONS:
if region.name != name:
continue
return region
@classmethod
def from_index(cls: Type[T], index: int) -> T:
for i, region in enumerate(REGIONS):
if i != index:
continue
return region
REGION_CUSTOM = Region("Custom", 0x00, 840, 960, 0) # ? CN-nya?
REGION_USA = Region("USA", 0x01, 902.75, 927.25, 50)
REGION_KOREA = Region("Korea", 0x02, 917.1, 923.3, 32)
REGION_EUROPE = Region("Europe", 0x03, 865.1, 867.9, 15)
REGION_JAPAN = Region("Japan", 0x04, 952.2, 953.6, 8)
REGION_MALAYSIA = Region("Malaysia", 0x05, 919.5, 922.5, 7)
REGION_EUROPE_3 = Region("Europe 3", 0x06, 865.7, 867.5, 4)
REGION_CHINA_1 = Region("China 1", 0x07, 840.125, 844.875, 20)
REGION_CHINA_2 = Region("China 2", 0x08, 920.125, 924.875, 20)
REGIONS = [REGION_USA, REGION_KOREA, REGION_EUROPE, REGION_JAPAN, REGION_MALAYSIA,
REGION_EUROPE_3, REGION_CHINA_1, REGION_CHINA_2]
@dataclass
class Frequency:
region: Region
min_frequency: float
max_frequency: float
@property
def channel_number(self) -> int:
hop = 1
temp = self.min_frequency
while True: # REFACTOR
if temp == self.max_frequency:
return hop
temp = round(temp + self.region.step, 3)
hop += 1
@classmethod
def from_bytes(cls: Type[T], data: bytes) -> T:
assert len(data) == 8
region = Region.from_value(data[0])
start_fred = int.from_bytes(data[3:5], "big") / 1000
channel_number = data[-1]
step = int.from_bytes(data[5:7], "big")
min_frequency_int = int.from_bytes(data[1:3], "big")
min_frequency = min_frequency_int + start_fred
max_frequency = min_frequency + (channel_number - 1) * step / 1000
return Frequency(region, min_frequency, max_frequency)
def to_command_data(self) -> bytes:
assert self.min_frequency <= self.max_frequency
step: int = int(self.region.step * 1000)
min_frequency_int: int = int(self.min_frequency)
min_frequency_fraction: int = int((self.min_frequency - min_frequency_int) * 1000)
data = bytearray([self.region.value])
data.extend(min_frequency_int.to_bytes(2, "big"))
data.extend(min_frequency_fraction.to_bytes(2, "big"))
data.extend(step.to_bytes(2, "big"))
data.extend(self.channel_number.to_bytes(1, "big"))
return data
def __str__(self) -> str:
return_value = ''
value = f'- REGION >> {self.region}'
return_value = f'{return_value}\n{value}'
value = f' - MIN FREQUENCY >> {self.min_frequency}'
return_value = f'{return_value}\n{value}'
value = f' - MAX FREQUENCY >> {self.max_frequency}'
return_value = f'{return_value}\n{value}'
return return_value.strip().upper()
class WiegandProtocol(Enum):
WG_26 = 0x00
WG_34 = 0x01
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return WiegandProtocol.DISPLAY_STRINGS[self.value]
WiegandProtocol.DISPLAY_STRINGS = ["WG26", "WG34"]
class WiegandByteFirstType(Enum):
LOW_BYTE_FIRST = 0x00
HIGH_BYTE_FIRST = 0x01
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return WiegandByteFirstType.DISPLAY_STRINGS[self.value]
WiegandByteFirstType.DISPLAY_STRINGS = ["Low byte first", "High byte first"]
@dataclass
class Wiegand:
is_open: bool
protocol: WiegandProtocol
byte_first_type: WiegandByteFirstType
@classmethod
def from_bytes(cls: Type[T], data: int) -> T:
bits: list[int] = [int(x) for x in list('{0:08b}'.format(data))]
# bit_4, bit_3, bit_2, bit_1, bit_0 = bits[3:8] # Reserved
return Wiegand(bool(bits[0]), WiegandProtocol(bits[1]), WiegandByteFirstType(bits[2]))
def to_int(self) -> int:
bits_int = [int(self.is_open), self.protocol.value, self.byte_first_type.value,
0, 0, 0, 0, 0]
bits_str = ''.join(str(bit) for bit in bits_int)
return int(bits_str, 2)
def __str__(self) -> str:
return f'Wiegand -> is_open: {self.is_open}, ' \
f'protocol: {self.protocol}, byte_first_type: {self.byte_first_type}'
@dataclass
class Antenna:
ant_8: bool
ant_7: bool
ant_6: bool
ant_5: bool
ant_4: bool
ant_3: bool
ant_2: bool
ant_1: bool
@classmethod
def from_bytes(cls: Type[T], data: int) -> T:
bits = [bool(int(x)) for x in list('{0:08b}'.format(data))]
ant_8, ant_7, ant_6, ant_5, ant_4, ant_3, ant_2, ant_1 = bits
return Antenna(ant_8, ant_7, ant_6, ant_5, ant_4, ant_3, ant_2, ant_1)
def to_int(self) -> int:
bits_int = [int(self.ant_8), int(self.ant_7), int(self.ant_6), int(self.ant_5),
int(self.ant_4), int(self.ant_3), int(self.ant_2), int(self.ant_1)]
bits_str = ''.join(str(bit) for bit in bits_int)
return int(bits_str, 2)
def __str__(self) -> str:
return f'Antenna: ant 1({self.ant_1}) -> ant 2({self.ant_2}) -> ant 3({self.ant_3}) -> ant 4({self.ant_4})' \
f' -> ant 5({self.ant_5}) -> ant 6({self.ant_6}) -> ant 7({self.ant_7}) -> ant 8({self.ant_8})'
@dataclass
class ReaderSettings:
address: int
rfid_protocol: RfidProtocol
work_mode: WorkMode
output_interface: OutputInterface
baud_rate: BaudRate
wiegand: Wiegand
antenna: Antenna
frequency: Frequency
power: int
output_memory_bank: MemoryBank
q_value: int
session: Session
output_start_address: int
output_length: int
filter_time: int
trigger_time: int
buzzer_time: int
inventory_interval: int
def __post_init__(self):
assert 0x00 <= self.address <= 0xFF
assert 0 <= self.power <= 33
assert 0 <= self.q_value <= 15
assert 0x00 <= self.output_start_address <= 0xFF
assert 0x00 <= self.output_length <= 0xFF
assert 0x00 <= self.filter_time <= 0xFF
assert 0x00 <= self.trigger_time <= 0xFF
assert 0x00 <= self.buzzer_time <= 0xFF
assert 0x00 <= self.inventory_interval <= 0xFF
@classmethod
def from_bytes(cls: Type[T], data: bytes) -> T:
wiegand = Wiegand.from_bytes(data[5])
output_interface = OutputInterface.WIEGAND if wiegand.is_open else OutputInterface(data[3])
return ReaderSettings(data[0], RfidProtocol(data[1]), WorkMode(data[2]), output_interface,
BaudRate(data[4]), wiegand, Antenna.from_bytes(data[6]),
Frequency.from_bytes(data[7:15]), data[15], MemoryBank(data[16]), data[17],
Session(data[18]), data[19], data[20], data[21], data[22], bool(data[23]), data[24])
def to_command_data(self) -> bytes:
# Wiegand
wiegand = self.wiegand
if self.output_interface == OutputInterface.WIEGAND:
wiegand.is_open = True
output_interface = OutputInterface.RS232 \
if self.output_interface == OutputInterface.WIEGAND else self.output_interface
data = bytearray([self.address, self.rfid_protocol.value, self.work_mode.value,
output_interface.value, self.baud_rate.value,
wiegand.to_int(), self.antenna.to_int()])
data.extend(self.frequency.to_command_data())
data.extend([self.power, self.output_memory_bank.value, self.q_value, self.session.value,
self.output_start_address, self.output_length,
self.filter_time, self.trigger_time, self.buzzer_time, self.inventory_interval])
return data
def __str__(self) -> str:
settings = [
"<<< START READER SETTINGS ================================",
f"ADDRESS >> {self.address}",
f"RFID PROTOCOL >> {self.rfid_protocol}",
f"WORK MODE >> {self.work_mode}",
f"OUT INTERFACE >> {self.output_interface}",
f"BAUD RATE >> {self.baud_rate}",
f"WIEGAND >> {self.wiegand}",
f"ANTENNA >> {self.antenna}",
f"FREQUENCY >>\n{self.frequency}",
f"POWER >> {self.power}",
f"OUT MEMORY BANK >> {self.output_memory_bank}",
f"Q VALUE >> {self.q_value}",
f"SESSION >> {self.session}",
f"OUT START ADDRESS >> {self.output_start_address}",
f"OUT LENGTH >> {self.output_length}",
f"FILTER TIME >> {self.filter_time}",
f"TRIGGER TIME >> {self.trigger_time}",
f"BUZZER TIME >> {self.buzzer_time}",
f"INVENTORY INTERVAL >> {self.inventory_interval}",
"<<< END READER SETTINGS ================================"
]
return "\n".join(settings).strip().upper()
reader.py
from enum import Enum
from typing import Iterator
from command import CommandRequest, Command
from response import Response, Tag, ReaderSettings, InventoryStatus
from transport import Transport
from utils import hex_readable
class StopType(Enum):
"""\
TIME: int = According to the time (in seconds)
NUMBER: int = According to the number or cycles
"""
TIME = 0x00
NUMBER = 0x01
class Reader:
def __init__(self, transport: Transport) -> None:
super().__init__()
self.transport = transport
self.transport.connect()
def close(self) -> None:
self.transport.close()
def __receive_response(self) -> bytes | None:
# Get header section
response_header_section: bytes = self.transport.read_bytes(length=5)
if not response_header_section:
return
assert len(response_header_section) == 5
# Get body section
body_length: int = response_header_section[-1]
response_body_section: bytes = self.transport.read_bytes(length=body_length + 2) # 2(checksum)
return response_header_section + response_body_section
def reader_settings(self) -> ReaderSettings:
cmd_request: CommandRequest = CommandRequest.GET_ALL_PARAM
command: Command = Command(cmd_request)
# Send request
self.transport.write_bytes(command.serialize())
# Receive response
response: Response = Response(self.__receive_response())
# Validation response
assert response.command == cmd_request
return ReaderSettings.from_bytes(response.payload)
def set_reader_settings(self, reader_settings: ReaderSettings) -> Response:
cmd_request: CommandRequest = CommandRequest.SET_ALL_PARAM
command: Command = Command(cmd_request, data=reader_settings.to_command_data())
# Send request
self.transport.write_bytes(command.serialize())
# Receive response
response: Response = Response(self.__receive_response())
# Validation response
assert response.command == cmd_request
return response
def start_inventory_answer_mode(self, stop_type: StopType, value: int) -> Iterator[Tag]:
cmd_request: CommandRequest = CommandRequest.INVENTORY_ISO_CONTINUE
data = bytearray([stop_type.value])
data.extend(value.to_bytes(4, "big"))
command = Command(cmd_request, data=data)
# Send request
self.transport.write_bytes(command.serialize())
while True:
raw_response: bytes | None = self.__receive_response()
if raw_response is None:
continue
response: Response = Response(raw_response)
inventory_status: InventoryStatus = InventoryStatus(response.status.value)
if inventory_status == InventoryStatus.NO_COUNT_LABEL:
break
if response.command == CommandRequest.INVENTORY_STOP:
break
tag: Tag = Tag(
rssi=response.payload[0:2],
antenna=response.payload[2],
channel=response.payload[3],
data=response.payload[5:])
yield tag
def stop_inventory_answer_mode(self) -> Response:
cmd_request: CommandRequest = CommandRequest.INVENTORY_STOP
command: Command = Command(cmd_request)
# Send request
self.transport.write_bytes(command.serialize())
# Receive response
response: Response = Response(self.__receive_response())
# Validation response
assert response.command == cmd_request
return response
utils.py
from array import array
def hex_readable(data_bytes: bytes | array, separator: str = " ") -> str:
return separator.join("{:02X}".format(x) for x in data_bytes)
def ip_bytes(ip_str: str) -> bytearray:
ip_str_split = ip_str.split('.')
assert len(ip_str_split) == 4
return bytearray([int(ip) for ip in ip_str_split])
def calculate_checksum(data: bytes) -> bytearray:
value = 0xFFFF
for d in data:
value ^= d
for _ in range(8):
value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
crc_msb = value >> 0x08
crc_lsb = value & 0xFF
return bytearray([crc_msb, crc_lsb])
def calculate_rssi(rssi: bytes) -> int:
return int.from_bytes(rssi, "big", signed=True)
main.py
from typing import Iterator
from response import Tag, ReaderSettings
from transport import Transport, SerialTransport, BaudRate, TcpTransport
from reader import Reader, StopType
transport: Transport = SerialTransport('/dev/ttyUSB0', BaudRate.BPS_115200)
# transport: Transport = TcpTransport('192.168.1.250', 2022)
reader: Reader = Reader(transport)
# 1. Inventory answer mode
tags: Iterator[Tag] = reader.start_inventory_answer_mode(stop_type=StopType.TIME, value=3) # Stop after 3 seconds
for tag in tags:
print(tag)
# 2. Get reader settings
reader_settings: ReaderSettings = reader.reader_settings()
print(reader_settings)
# 3. Set power & buzzer
reader_settings.power = 5
reader_settings.buzzer_time = False
response = reader.set_reader_settings(reader_settings)
print(f"Response set power & buzzer: {response.status}")
reader.close()