Komunikasi EL-UHF-RC4 dengan Komputer
Pada artikel ini, kami mencoba berkomunikasi dengan reader EL-UHF-RC4 series menggunakan protokol melalui koneksi Serial (RS232) atau TCP/IP.
Menggunakan koneksi Serial diperlukan kabel RS232 to USB untuk menghubungkan reader ke komputer. Sedangkan untuk koneksi TCP/IP, reader terhubung melalui kabel Ethernet ke komputer/router.
Kode
🐍 Python
Kode python di bawah ini adalah versi sederhana. Contoh kode Python lengkap tersedia dalam program demo (dibuat menggunakan bahasa Python), link download: https://cloud.electron.id/s/wSap3C2PiWXz4Jj
transport.py
import glob
import sys
from enum import Enum
from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
import serial
T = TypeVar('T', bound='Parent')
class BaudRate(Enum):
BPS_9600 = 0x00
BPS_19200 = 0x01
BPS_38400 = 0x02
BPS_57600 = 0x03
BPS_115200 = 0x04
_ignore_ = ["INT"]
INT = []
def __str__(self) -> str:
return f'{self.to_int} bps'
@property
def to_int(self) -> int:
return self.INT[self.value]
@classmethod
def from_int(cls, value: int) -> T:
for baud_rate in BaudRate:
if baud_rate.to_int == value:
return baud_rate
BaudRate.INT = [9600, 19200, 38400, 57600, 115200]
class Transport(ABC):
@abstractmethod
def connect(self, **kwargs) -> None:
raise NotImplementedError
@abstractmethod
def reconnect(self, **kwargs) -> bytes:
raise NotImplementedError
@abstractmethod
def read_bytes(self, **kwargs) -> bytes:
raise NotImplementedError
@abstractmethod
def write_bytes(self, buffer: bytes) -> None:
raise NotImplementedError
@abstractmethod
def clear_buffer(self) -> None:
raise NotImplementedError
@abstractmethod
def close(self) -> None:
raise NotImplementedError
class TcpTransport(Transport):
def __init__(self, ip_address: str, port: int, timeout: int = 6) -> None:
self.ip_address: str = ip_address
self.port: int = port
self.timeout: int = timeout
self.socket: socket | None = None
def __str__(self) -> str:
return f'TcpTransport(ip_address: {self.ip_address}, port: {self.port}, timeout: {self.timeout})'
def connect(self) -> None:
self.socket = socket(AF_INET, SOCK_STREAM)
self.socket.settimeout(self.timeout)
self.socket.connect((self.ip_address, self.port))
def reconnect(self, ip_address: str | None = None, port: int | None = None,
timeout: int | None = None) -> None:
self.socket.close()
self.ip_address = ip_address if ip_address else self.ip_address
self.timeout = timeout if timeout else self.timeout
self.connect()
def read_bytes(self, length: int) -> bytes:
return self.socket.recv(length)
def write_bytes(self, buffer: bytes) -> None:
self.socket.sendall(buffer)
def clear_buffer(self) -> None:
# self.socket.recv(1024)
pass
def close(self) -> None:
self.socket.close()
class SerialTransport(Transport):
def __init__(self, serial_port: str, baud_rate: BaudRate, timeout: int = 0.5) -> None:
self.serial_port = serial_port
self.baud_rate = baud_rate
self.timeout = timeout
self.serial = serial.Serial(self.serial_port, self.baud_rate.to_int,
timeout=timeout, write_timeout=timeout * 2)
def __str__(self) -> str:
return f'SerialTransport(port: {self.serial_port}, baud_rate: {self.baud_rate})'
@classmethod
def scan(cls, timeout: int = 1) -> list[str]:
result: list[str] = []
if sys.platform.startswith("win"): # Windows
ports = ["COM%s" % (i + 1) for i in range(15)]
elif sys.platform.startswith("linux") or sys.platform.startswith("cygwin"): # Linux
# this excludes your current terminal "/dev/tty"
ports = glob.glob("/dev/tty[A-Za-z]*")
elif sys.platform.startswith("darwin"): # Mac OS
ports = glob.glob("/dev/tty.*")
else:
raise EnvironmentError("Unsupported platform")
for port in ports:
try:
s = serial.Serial(port, timeout=timeout)
s.close()
result.append(port)
except serial.SerialException as _:
pass
return result
def connect(self, **kwargs):
pass
def reconnect(self, serial_port: str | None = None, baud_rate: BaudRate | None = None,
timeout: int | None = None) -> None:
self.close()
self.serial_port = serial_port if serial_port else self.serial.port
self.baud_rate = baud_rate if baud_rate else BaudRate.from_int(self.serial.baudrate)
self.timeout = timeout if timeout else self.serial.timeout
self.serial = serial.Serial(self.serial_port, self.baud_rate.to_int,
timeout=self.timeout, write_timeout=self.timeout * 2)
def read_bytes(self, length: int) -> bytes:
response = self.serial.read(length)
return response
def write_bytes(self, buffer: bytes) -> None:
self.serial.write(buffer)
def clear_buffer(self) -> None:
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
def close(self) -> None:
self.serial.close()command.py
from enum import Enum
from utils import calculate_checksum
HEADER = 0xCF
BROADCAST_ADDRESS = 0xFF
class CommandOption(Enum):
SET = 0x01
GET = 0x02
class CommandRequest(Enum):
SET_ALL_PARAM = 0x0071
GET_ALL_PARAM = 0x0072
INVENTORY_ISO_CONTINUE = 0x0001
INVENTORY_STOP = 0x0002
INVENTORY_ACTIVE = 0x0001
class Command(object):
def __init__(self,
command: CommandRequest,
address=BROADCAST_ADDRESS,
data: bytes | bytearray = bytearray()) -> None:
self.address = address
self.command = command
self.data = data
def serialize(self, with_checksum: bool = True) -> bytes:
base_data = bytearray(
[HEADER, self.address]) + \
self.command.value.to_bytes(2, "big") + \
bytearray([len(self.data)]) + \
bytearray(self.data)
if with_checksum:
checksum = calculate_checksum(base_data)
base_data.extend(checksum)
return base_dataresponse.py
from dataclasses import dataclass
from enum import Enum
from typing import TypeVar, Type
from command import HEADER, CommandRequest
from utils import calculate_checksum, hex_readable, calculate_rssi
T = TypeVar('T', bound='Parent')
class Status(Enum):
SUCCESS = 0x00
WRONG_PARAM = 0x01
CMD_EXECUTION_FAILED = 0x02
RESERVE = 0x03
NO_COUNT_LABEL = 0x12
TIMEOUT = 0x14
TAG_RESPONSE_ERROR = 0x15
AUTHENTICATION_FAILED = 0x16
WRONG_PASSWORD = 0x17
NO_MORE_DATA = 0xFF
class TagStatus(Enum):
NO_ERROR = 0xFF
TIMEOUT = 0x14
OTHER_ERROR = 0x81
STORAGE_AREA_ERROR = 0x82
STORAGE_LOCK = 0x83
INSUFFICIENT_POWER = 0x84
NO_POWER = 0x85
class InventoryStatus(Enum):
SUCCESS = 0x00
WRONG_PARAM = 0x01
CMD_EXECUTION_FAILED = 0x02
NO_COUNT_LABEL = 0x12
EXCEED_MAX_TRANSMIT_SERIAL = 0x17
@dataclass
class Tag:
rssi: bytes
antenna: int
channel: int
data: bytes
count: int = 1
def __str__(self) -> str:
return f'Tag(rssi: {str(calculate_rssi(self.rssi))[0:3]}, ' \
f'antenna: {self.antenna}, channel: {self.channel}, ' \
f'data: {hex_readable(self.data)})'
class Response:
def __init__(self, response: bytes) -> None:
if response is None:
raise ValueError("Response is None")
header_section: bytes = response[0:5]
assert header_section[0] == HEADER
self.header: int = response[0]
self.address: int = response[1]
self.command: CommandRequest = CommandRequest(int.from_bytes(response[2:4], "big"))
self.length: int = response[4]
self.status: Status = Status(response[5])
__body_n_checksum_section: bytes = response[6: 4 + self.length + 2 + 1]
self.payload: bytes = __body_n_checksum_section[0:-2]
self.checksum: bytes = __body_n_checksum_section[-2:]
# Verify checksum
data = bytearray(header_section)
data.extend(bytearray([self.status.value]))
if self.payload:
data.extend(self.payload)
crc_msb, crc_lsb = calculate_checksum(data)
assert self.checksum[0] == crc_msb and self.checksum[1] == crc_lsb
def __str__(self) -> str:
response = [
"<<< START RESPONSE ================================",
f"COMMAND >> {self.command}",
f"STATUS >> {self.status}",
]
if self.payload:
response.append(f"PAYLOAD >> {hex_readable(self.payload)}")
response.append("<<< END RESPONSE ================================")
return "\n".join(response).strip().upper()
class RfidProtocol(Enum):
"""Only for ISO 18000-6C"""
ISO_18000_6C = 0x00
GBT_29768 = 0x01
GJB_7377_1 = 0x02
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return RfidProtocol.DISPLAY_STRINGS[self.value]
RfidProtocol.DISPLAY_STRINGS = [
"ISO 18000-6C",
"GB/T 29768",
"GJB 7377.1"
]
class BaudRate(Enum):
BPS_9600 = 0x00
BPS_19200 = 0x01
BPS_38400 = 0x02
BPS_57600 = 0x03
BPS_115200 = 0x04
_ignore_ = ["INT"]
INT = []
def __str__(self) -> str:
return f'{self.to_int} bps'
@property
def to_int(self) -> int:
return self.INT[self.value]
@classmethod
def from_int(cls, value: int) -> T:
for baud_rate in BaudRate:
if baud_rate.to_int == value:
return baud_rate
BaudRate.INT = [9600, 19200, 38400, 57600, 115200]
class Relay(Enum):
OPEN = 0x01
CLOSE = 0x02
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return Relay.DISPLAY_STRINGS[self.value - 1]
def to_index(self) -> int:
return self.value - 1
Relay.DISPLAY_STRINGS = ["Open", "Close"]
class WorkMode(Enum):
ANSWER_MODE = 0x00
ACTIVE_MODE = 0x01
TRIGGER_MODE = 0x02
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return WorkMode.DISPLAY_STRINGS[self.value]
WorkMode.DISPLAY_STRINGS = ["Answer Mode", "Active Mode", "Trigger Mode"]
class OutputInterface(Enum):
WIEGAND = 0x99
RS232 = 0x80
RS485 = 0x40
RJ45 = 0x20
# WiFi = 0x10
USB = 0x01
KEYBOARD = 0x02
# CDC_COM = 0x04
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
@property
def index(self) -> int:
for index, value in enumerate(OutputInterface):
if self == value:
return index
def __str__(self) -> str:
return OutputInterface.DISPLAY_STRINGS[self.index]
@classmethod
def from_index(cls: Type[T], index: int) -> T:
for i, value in enumerate(OutputInterface):
if index == i:
return value
OutputInterface.DISPLAY_STRINGS = [
"Wiegand",
"RS232",
"RS485",
"RJ45",
"USB",
"Keyboard",
# "CDC_COM"
]
class Session(Enum):
SESSION_0 = 0x00
SESSION_1 = 0x01
SESSION_2 = 0x02
SESSION_3 = 0x03
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return Session.DISPLAY_STRINGS[self.value]
Session.DISPLAY_STRINGS = [
"S0",
"S1",
"S2",
"S3"
]
class MemoryBank(Enum):
PASSWORD = 0x00
EPC = 0x01
TID = 0x02
USER = 0x03
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return MemoryBank.DISPLAY_STRINGS[self.value]
MemoryBank.DISPLAY_STRINGS = [
"Password",
"EPC",
"TID",
"User"
]
class LockMemoryBank(Enum):
KILL_PASSWORD = 0x00
ACCESS_PASSWORD = 0x01
EPC = 0x02
# TID = 0x03 # Unlock/lock response is None
USER = 0x04
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def to_index(self) -> int:
for index, lock_memory_bank in enumerate(LockMemoryBank):
if lock_memory_bank.value == self.value:
return index
@classmethod
def from_index(cls: Type[T], index: int) -> T:
for i, value in enumerate(LockMemoryBank):
if index == i:
return value
def __str__(self) -> str:
return LockMemoryBank.DISPLAY_STRINGS[self.to_index()]
LockMemoryBank.DISPLAY_STRINGS = [
"Access Password",
"Kill Password",
"EPC",
"User"
]
class LockAction(Enum):
UNLOCK = 0x00
UNLOCK_PERMANENT = 0x01
LOCK = 0x02
LOCK_PERMANENT = 0x03
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return LockAction.DISPLAY_STRINGS[self.value]
LockAction.DISPLAY_STRINGS = [
"Unlock",
"Unlock (Permanent)",
"Lock",
"Lock (Permanent)",
]
class Region:
def __init__(self, name: str, value: int,
start_frequency: float, end_frequency: float, default_channel_number: int) -> None:
self.name = name
self.value = value
self.start_frequency = start_frequency
self.end_frequency = end_frequency
self.default_channel_number = default_channel_number
self.step = round((self.end_frequency - self.start_frequency) / (self.default_channel_number - 1), 2)
def __str__(self) -> str:
return self.name
@property
def index(self) -> int:
for index, region in enumerate(REGIONS):
if self.value == region.value:
return index
@property
def values(self) -> list[float]:
return [round(self.start_frequency + i * self.step, 3) for i in range(self.default_channel_number)]
@classmethod
def from_value(cls: Type[T], value: int) -> T:
for region in REGIONS:
if region.value != value:
continue
return region
@classmethod
def from_name(cls: Type[T], name: str) -> T:
for region in REGIONS:
if region.name != name:
continue
return region
@classmethod
def from_index(cls: Type[T], index: int) -> T:
for i, region in enumerate(REGIONS):
if i != index:
continue
return region
REGION_CUSTOM = Region("Custom", 0x00, 840, 960, 0) # ? CN-nya?
REGION_USA = Region("USA", 0x01, 902.75, 927.25, 50)
REGION_KOREA = Region("Korea", 0x02, 917.1, 923.3, 32)
REGION_EUROPE = Region("Europe", 0x03, 865.1, 867.9, 15)
REGION_JAPAN = Region("Japan", 0x04, 952.2, 953.6, 8)
REGION_MALAYSIA = Region("Malaysia", 0x05, 919.5, 922.5, 7)
REGION_EUROPE_3 = Region("Europe 3", 0x06, 865.7, 867.5, 4)
REGION_CHINA_1 = Region("China 1", 0x07, 840.125, 844.875, 20)
REGION_CHINA_2 = Region("China 2", 0x08, 920.125, 924.875, 20)
REGIONS = [REGION_USA, REGION_KOREA, REGION_EUROPE, REGION_JAPAN, REGION_MALAYSIA,
REGION_EUROPE_3, REGION_CHINA_1, REGION_CHINA_2]
@dataclass
class Frequency:
region: Region
min_frequency: float
max_frequency: float
@property
def channel_number(self) -> int:
hop = 1
temp = self.min_frequency
while True: # REFACTOR
if temp == self.max_frequency:
return hop
temp = round(temp + self.region.step, 3)
hop += 1
@classmethod
def from_bytes(cls: Type[T], data: bytes) -> T:
assert len(data) == 8
region = Region.from_value(data[0])
start_fred = int.from_bytes(data[3:5], "big") / 1000
channel_number = data[-1]
step = int.from_bytes(data[5:7], "big")
min_frequency_int = int.from_bytes(data[1:3], "big")
min_frequency = min_frequency_int + start_fred
max_frequency = min_frequency + (channel_number - 1) * step / 1000
return Frequency(region, min_frequency, max_frequency)
def to_command_data(self) -> bytes:
assert self.min_frequency <= self.max_frequency
step: int = int(self.region.step * 1000)
min_frequency_int: int = int(self.min_frequency)
min_frequency_fraction: int = int((self.min_frequency - min_frequency_int) * 1000)
data = bytearray([self.region.value])
data.extend(min_frequency_int.to_bytes(2, "big"))
data.extend(min_frequency_fraction.to_bytes(2, "big"))
data.extend(step.to_bytes(2, "big"))
data.extend(self.channel_number.to_bytes(1, "big"))
return data
def __str__(self) -> str:
return_value = ''
value = f'- REGION >> {self.region}'
return_value = f'{return_value}\n{value}'
value = f' - MIN FREQUENCY >> {self.min_frequency}'
return_value = f'{return_value}\n{value}'
value = f' - MAX FREQUENCY >> {self.max_frequency}'
return_value = f'{return_value}\n{value}'
return return_value.strip().upper()
class WiegandProtocol(Enum):
WG_26 = 0x00
WG_34 = 0x01
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return WiegandProtocol.DISPLAY_STRINGS[self.value]
WiegandProtocol.DISPLAY_STRINGS = ["WG26", "WG34"]
class WiegandByteFirstType(Enum):
LOW_BYTE_FIRST = 0x00
HIGH_BYTE_FIRST = 0x01
_ignore_ = ["DISPLAY_STRINGS"]
DISPLAY_STRINGS = []
def __str__(self) -> str:
return WiegandByteFirstType.DISPLAY_STRINGS[self.value]
WiegandByteFirstType.DISPLAY_STRINGS = ["Low byte first", "High byte first"]
@dataclass
class Wiegand:
is_open: bool
protocol: WiegandProtocol
byte_first_type: WiegandByteFirstType
@classmethod
def from_bytes(cls: Type[T], data: int) -> T:
bits: list[int] = [int(x) for x in list('{0:08b}'.format(data))]
# bit_4, bit_3, bit_2, bit_1, bit_0 = bits[3:8] # Reserved
return Wiegand(bool(bits[0]), WiegandProtocol(bits[1]), WiegandByteFirstType(bits[2]))
def to_int(self) -> int:
bits_int = [int(self.is_open), self.protocol.value, self.byte_first_type.value,
0, 0, 0, 0, 0]
bits_str = ''.join(str(bit) for bit in bits_int)
return int(bits_str, 2)
def __str__(self) -> str:
return f'Wiegand -> is_open: {self.is_open}, ' \
f'protocol: {self.protocol}, byte_first_type: {self.byte_first_type}'
@dataclass
class Antenna:
ant_8: bool
ant_7: bool
ant_6: bool
ant_5: bool
ant_4: bool
ant_3: bool
ant_2: bool
ant_1: bool
@classmethod
def from_bytes(cls: Type[T], data: int) -> T:
bits = [bool(int(x)) for x in list('{0:08b}'.format(data))]
ant_8, ant_7, ant_6, ant_5, ant_4, ant_3, ant_2, ant_1 = bits
return Antenna(ant_8, ant_7, ant_6, ant_5, ant_4, ant_3, ant_2, ant_1)
def to_int(self) -> int:
bits_int = [int(self.ant_8), int(self.ant_7), int(self.ant_6), int(self.ant_5),
int(self.ant_4), int(self.ant_3), int(self.ant_2), int(self.ant_1)]
bits_str = ''.join(str(bit) for bit in bits_int)
return int(bits_str, 2)
def __str__(self) -> str:
return f'Antenna: ant 1({self.ant_1}) -> ant 2({self.ant_2}) -> ant 3({self.ant_3}) -> ant 4({self.ant_4})' \
f' -> ant 5({self.ant_5}) -> ant 6({self.ant_6}) -> ant 7({self.ant_7}) -> ant 8({self.ant_8})'
@dataclass
class ReaderSettings:
address: int
rfid_protocol: RfidProtocol
work_mode: WorkMode
output_interface: OutputInterface
baud_rate: BaudRate
wiegand: Wiegand
antenna: Antenna
frequency: Frequency
power: int
output_memory_bank: MemoryBank
q_value: int
session: Session
output_start_address: int
output_length: int
filter_time: int
trigger_time: int
buzzer_time: int
inventory_interval: int
def __post_init__(self):
assert 0x00 <= self.address <= 0xFF
assert 0 <= self.power <= 33
assert 0 <= self.q_value <= 15
assert 0x00 <= self.output_start_address <= 0xFF
assert 0x00 <= self.output_length <= 0xFF
assert 0x00 <= self.filter_time <= 0xFF
assert 0x00 <= self.trigger_time <= 0xFF
assert 0x00 <= self.buzzer_time <= 0xFF
assert 0x00 <= self.inventory_interval <= 0xFF
@classmethod
def from_bytes(cls: Type[T], data: bytes) -> T:
wiegand = Wiegand.from_bytes(data[5])
output_interface = OutputInterface.WIEGAND if wiegand.is_open else OutputInterface(data[3])
return ReaderSettings(data[0], RfidProtocol(data[1]), WorkMode(data[2]), output_interface,
BaudRate(data[4]), wiegand, Antenna.from_bytes(data[6]),
Frequency.from_bytes(data[7:15]), data[15], MemoryBank(data[16]), data[17],
Session(data[18]), data[19], data[20], data[21], data[22], bool(data[23]), data[24])
def to_command_data(self) -> bytes:
# Wiegand
wiegand = self.wiegand
if self.output_interface == OutputInterface.WIEGAND:
wiegand.is_open = True
output_interface = OutputInterface.RS232 \
if self.output_interface == OutputInterface.WIEGAND else self.output_interface
data = bytearray([self.address, self.rfid_protocol.value, self.work_mode.value,
output_interface.value, self.baud_rate.value,
wiegand.to_int(), self.antenna.to_int()])
data.extend(self.frequency.to_command_data())
data.extend([self.power, self.output_memory_bank.value, self.q_value, self.session.value,
self.output_start_address, self.output_length,
self.filter_time, self.trigger_time, self.buzzer_time, self.inventory_interval])
return data
def __str__(self) -> str:
settings = [
"<<< START READER SETTINGS ================================",
f"ADDRESS >> {self.address}",
f"RFID PROTOCOL >> {self.rfid_protocol}",
f"WORK MODE >> {self.work_mode}",
f"OUT INTERFACE >> {self.output_interface}",
f"BAUD RATE >> {self.baud_rate}",
f"WIEGAND >> {self.wiegand}",
f"ANTENNA >> {self.antenna}",
f"FREQUENCY >>\n{self.frequency}",
f"POWER >> {self.power}",
f"OUT MEMORY BANK >> {self.output_memory_bank}",
f"Q VALUE >> {self.q_value}",
f"SESSION >> {self.session}",
f"OUT START ADDRESS >> {self.output_start_address}",
f"OUT LENGTH >> {self.output_length}",
f"FILTER TIME >> {self.filter_time}",
f"TRIGGER TIME >> {self.trigger_time}",
f"BUZZER TIME >> {self.buzzer_time}",
f"INVENTORY INTERVAL >> {self.inventory_interval}",
"<<< END READER SETTINGS ================================"
]
return "\n".join(settings).strip().upper()reader.py
from enum import Enum
from typing import Iterator
from command import CommandRequest, Command
from response import Response, Tag, ReaderSettings, InventoryStatus
from transport import Transport
from utils import hex_readable
class StopType(Enum):
"""\
TIME: int = According to the time (in seconds)
NUMBER: int = According to the number or cycles
"""
TIME = 0x00
NUMBER = 0x01
class Reader:
def __init__(self, transport: Transport) -> None:
super().__init__()
self.transport = transport
self.transport.connect()
def close(self) -> None:
self.transport.close()
def __receive_response(self) -> bytes | None:
# Get header section
response_header_section: bytes = self.transport.read_bytes(length=5)
if not response_header_section:
return
assert len(response_header_section) == 5
# Get body section
body_length: int = response_header_section[-1]
response_body_section: bytes = self.transport.read_bytes(length=body_length + 2) # 2(checksum)
return response_header_section + response_body_section
def reader_settings(self) -> ReaderSettings:
cmd_request: CommandRequest = CommandRequest.GET_ALL_PARAM
command: Command = Command(cmd_request)
# Send request
self.transport.write_bytes(command.serialize())
# Receive response
response: Response = Response(self.__receive_response())
# Validation response
assert response.command == cmd_request
return ReaderSettings.from_bytes(response.payload)
def set_reader_settings(self, reader_settings: ReaderSettings) -> Response:
cmd_request: CommandRequest = CommandRequest.SET_ALL_PARAM
command: Command = Command(cmd_request, data=reader_settings.to_command_data())
# Send request
self.transport.write_bytes(command.serialize())
# Receive response
response: Response = Response(self.__receive_response())
# Validation response
assert response.command == cmd_request
return response
def start_inventory_answer_mode(self, stop_type: StopType, value: int) -> Iterator[Tag]:
cmd_request: CommandRequest = CommandRequest.INVENTORY_ISO_CONTINUE
data = bytearray([stop_type.value])
data.extend(value.to_bytes(4, "big"))
command = Command(cmd_request, data=data)
# Send request
self.transport.write_bytes(command.serialize())
while True:
raw_response: bytes | None = self.__receive_response()
if raw_response is None:
continue
response: Response = Response(raw_response)
inventory_status: InventoryStatus = InventoryStatus(response.status.value)
if inventory_status == InventoryStatus.NO_COUNT_LABEL:
break
if response.command == CommandRequest.INVENTORY_STOP:
break
tag: Tag = Tag(
rssi=response.payload[0:2],
antenna=response.payload[2],
channel=response.payload[3],
data=response.payload[5:])
yield tag
def stop_inventory_answer_mode(self) -> Response:
cmd_request: CommandRequest = CommandRequest.INVENTORY_STOP
command: Command = Command(cmd_request)
# Send request
self.transport.write_bytes(command.serialize())
# Receive response
response: Response = Response(self.__receive_response())
# Validation response
assert response.command == cmd_request
return response
utils.py
from array import array
def hex_readable(data_bytes: bytes | array, separator: str = " ") -> str:
return separator.join("{:02X}".format(x) for x in data_bytes)
def ip_bytes(ip_str: str) -> bytearray:
ip_str_split = ip_str.split('.')
assert len(ip_str_split) == 4
return bytearray([int(ip) for ip in ip_str_split])
def calculate_checksum(data: bytes) -> bytearray:
value = 0xFFFF
for d in data:
value ^= d
for _ in range(8):
value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
crc_msb = value >> 0x08
crc_lsb = value & 0xFF
return bytearray([crc_msb, crc_lsb])
def calculate_rssi(rssi: bytes) -> int:
return int.from_bytes(rssi, "big", signed=True)main.py
from typing import Iterator
from response import Tag, ReaderSettings
from transport import Transport, SerialTransport, BaudRate, TcpTransport
from reader import Reader, StopType
transport: Transport = SerialTransport('/dev/ttyUSB0', BaudRate.BPS_115200)
# transport: Transport = TcpTransport('192.168.1.250', 2022)
reader: Reader = Reader(transport)
# 1. Inventory answer mode
tags: Iterator[Tag] = reader.start_inventory_answer_mode(stop_type=StopType.TIME, value=3) # Stop after 3 seconds
for tag in tags:
print(tag)
# 2. Get reader settings
reader_settings: ReaderSettings = reader.reader_settings()
print(reader_settings)
# 3. Set power & buzzer
reader_settings.power = 5
reader_settings.buzzer_time = False
response = reader.set_reader_settings(reader_settings)
print(f"Response set power & buzzer: {response.status}")
reader.close()