Komunikasi EL-UHF-RC4 dengan Komputer §
Pada artikel ini, kami mencoba berkomunikasi dengan reader EL-UHF-RC4 series menggunakan protokol melalui koneksi Serial (RS232) atau TCP/IP.
Menggunakan koneksi Serial diperlukan kabel RS232 to USB untuk menghubungkan reader ke komputer. Sedangkan untuk koneksi TCP/IP, reader terhubung melalui kabel Ethernet ke komputer/router.
Kode §
🐍 Python §
Kode python di bawah ini adalah versi sederhana. Contoh kode Python lengkap tersedia dalam program demo (dibuat menggunakan bahasa Python), link download: https://cloud.electron.id/s/wSap3C2PiWXz4Jj
transport.py
§
import glob
import sys
from enum import Enum
from abc import ABC , abstractmethod
from socket import socket, AF_INET , SOCK_STREAM
from typing import TypeVar
import serial
T = TypeVar( 'T' , bound = 'Parent' )
class BaudRate ( Enum ):
BPS_9600 = 0x 00
BPS_19200 = 0x 01
BPS_38400 = 0x 02
BPS_57600 = 0x 03
BPS_115200 = 0x 04
_ignore_ = [ "INT" ]
INT = []
def __str__ (self) -> str :
return f ' {self .to_int } bps'
@ property
def to_int (self) -> int :
return self . INT [ self .value]
@ classmethod
def from_int (cls, value: int ) -> T:
for baud_rate in BaudRate:
if baud_rate.to_int == value:
return baud_rate
BaudRate. INT = [ 9600 , 19200 , 38400 , 57600 , 115200 ]
class Transport ( ABC ):
@abstractmethod
def connect (self, ** kwargs) -> None :
raise NotImplementedError
@abstractmethod
def reconnect (self, ** kwargs) -> bytes :
raise NotImplementedError
@abstractmethod
def read_bytes (self, ** kwargs) -> bytes :
raise NotImplementedError
@abstractmethod
def write_bytes (self, buffer: bytes ) -> None :
raise NotImplementedError
@abstractmethod
def clear_buffer (self) -> None :
raise NotImplementedError
@abstractmethod
def close (self) -> None :
raise NotImplementedError
class TcpTransport ( Transport ):
def __init__ (self, ip_address: str , port: int , timeout: int = 6 ) -> None :
self .ip_address: str = ip_address
self .port: int = port
self .timeout: int = timeout
self .socket: socket | None = None
def __str__ (self) -> str :
return f 'TcpTransport(ip_address: {self .ip_address } , port: {self .port } , timeout: {self .timeout } )'
def connect (self) -> None :
self .socket = socket( AF_INET , SOCK_STREAM )
self .socket.settimeout( self .timeout)
self .socket.connect(( self .ip_address, self .port))
def reconnect (self, ip_address: str | None = None , port: int | None = None ,
timeout: int | None = None ) -> None :
self .socket.close()
self .ip_address = ip_address if ip_address else self .ip_address
self .timeout = timeout if timeout else self .timeout
self .connect()
def read_bytes (self, length: int ) -> bytes :
return self .socket.recv(length)
def write_bytes (self, buffer: bytes ) -> None :
self .socket.sendall(buffer)
def clear_buffer (self) -> None :
# self.socket.recv(1024)
pass
def close (self) -> None :
self .socket.close()
class SerialTransport ( Transport ):
def __init__ (self, serial_port: str , baud_rate: BaudRate, timeout: int = 0.5 ) -> None :
self .serial_port = serial_port
self .baud_rate = baud_rate
self .timeout = timeout
self .serial = serial.Serial( self .serial_port, self .baud_rate.to_int,
timeout = timeout, write_timeout = timeout * 2 )
def __str__ (self) -> str :
return f 'SerialTransport(port: {self .serial_port } , baud_rate: {self .baud_rate } )'
@ classmethod
def scan (cls, timeout: int = 1 ) -> list[ str ]:
result: list[ str ] = []
if sys.platform.startswith( "win" ): # Windows
ports = [ "COM %s " % (i + 1 ) for i in range ( 15 )]
elif sys.platform.startswith( "linux" ) or sys.platform.startswith( "cygwin" ): # Linux
# this excludes your current terminal "/dev/tty"
ports = glob.glob( "/dev/tty[A-Za-z]*" )
elif sys.platform.startswith( "darwin" ): # Mac OS
ports = glob.glob( "/dev/tty.*" )
else :
raise EnvironmentError ( "Unsupported platform" )
for port in ports:
try :
s = serial.Serial(port, timeout = timeout)
s.close()
result.append(port)
except serial.SerialException as _:
pass
return result
def connect (self, ** kwargs):
pass
def reconnect (self, serial_port: str | None = None , baud_rate: BaudRate | None = None ,
timeout: int | None = None ) -> None :
self .close()
self .serial_port = serial_port if serial_port else self .serial.port
self .baud_rate = baud_rate if baud_rate else BaudRate.from_int( self .serial.baudrate)
self .timeout = timeout if timeout else self .serial.timeout
self .serial = serial.Serial( self .serial_port, self .baud_rate.to_int,
timeout = self .timeout, write_timeout = self .timeout * 2 )
def read_bytes (self, length: int ) -> bytes :
response = self .serial.read(length)
return response
def write_bytes (self, buffer: bytes ) -> None :
self .serial.write(buffer)
def clear_buffer (self) -> None :
self .serial.reset_input_buffer()
self .serial.reset_output_buffer()
def close (self) -> None :
self .serial.close()
command.py
§
from enum import Enum
from utils import calculate_checksum
HEADER = 0x CF
BROADCAST_ADDRESS = 0x FF
class CommandOption ( Enum ):
SET = 0x 01
GET = 0x 02
class CommandRequest ( Enum ):
SET_POWER = 0x 0053
GET_DEVICE_INFO = 0x 0070
INVENTORY_ISO_CONTINUE = 0x 0001
INVENTORY_STOP = 0x 0002
INVENTORY_ACTIVE = 0x 0001
class Command ( object ):
def __init__ (self,
command: CommandRequest,
address = BROADCAST_ADDRESS ,
data: bytes | bytearray = bytearray ()) -> None :
self .address = address
self .command = command
self .data = data
def serialize (self, with_checksum: bool = True ) -> bytes :
base_data = bytearray (
[ HEADER , self .address]) + \
self .command.value.to_bytes( 2 , "big" ) + \
bytearray ([ len ( self .data)]) + \
bytearray ( self .data)
if with_checksum:
checksum = calculate_checksum(base_data)
base_data.extend(checksum)
return base_data
response.py
§
from dataclasses import dataclass
from enum import Enum
from command import HEADER , CommandRequest
from utils import calculate_checksum, hex_readable
class Status ( Enum ):
SUCCESS = 0x 00
WRONG_PARAM = 0x 01
CMD_EXECUTION_FAILED = 0x 02
RESERVE = 0x 03
NO_COUNT_LABEL = 0x 12
TIMEOUT = 0x 14
TAG_RESPONSE_ERROR = 0x 15
AUTHENTICATION_FAILED = 0x 16
WRONG_PASSWORD = 0x 17
NO_MORE_DATA = 0x FF
class TagStatus ( Enum ):
NO_ERROR = 0x FF
TIMEOUT = 0x 14
OTHER_ERROR = 0x 81
STORAGE_AREA_ERROR = 0x 82
STORAGE_LOCK = 0x 83
INSUFFICIENT_POWER = 0x 84
NO_POWER = 0x 85
class InventoryStatus ( Enum ):
SUCCESS = 0x 00
WRONG_PARAM = 0x 01
CMD_EXECUTION_FAILED = 0x 02
NO_COUNT_LABEL = 0x 12
EXCEED_MAX_TRANSMIT_SERIAL = 0x 17
@dataclass
class Tag :
rssi: bytes
antenna: int
channel: int
data: bytes
count: int = 1
def __str__ (self) -> str :
return f 'Tag(rssi: { hex_readable( self .rssi) } , ' \
f 'antenna: {self .antenna } , channel: {self .channel } , ' \
f 'data: { hex_readable( self .data) } )'
class Response :
def __init__ (self, response: bytes ) -> None :
if response is None :
raise ValueError ( "Response is None" )
header_section: bytes = response[ 0 : 5 ]
assert header_section[ 0 ] == HEADER
self .header: int = response[ 0 ]
self .address: int = response[ 1 ]
self .command: CommandRequest = CommandRequest( int .from_bytes(response[ 2 : 4 ], "big" ))
self .length: int = response[ 4 ]
self .status: Status = Status(response[ 5 ])
__body_n_checksum_section: bytes = response[ 6 : 4 + self .length + 2 + 1 ]
self .payload: bytes = __body_n_checksum_section[ 0 : - 2 ]
self .checksum: bytes = __body_n_checksum_section[ - 2 :]
# Verify checksum
data = bytearray (header_section)
data.extend( bytearray ([ self .status.value]))
if self .payload:
data.extend( self .payload)
crc_msb, crc_lsb = calculate_checksum(data)
assert self .checksum[ 0 ] == crc_msb and self .checksum[ 1 ] == crc_lsb
reader.py
§
from enum import Enum
from typing import Iterator
from command import CommandRequest, Command
from response import Response, InventoryStatus, Tag
from transport import Transport
from utils import hex_readable
class StopType ( Enum ):
""" \
TIME: int = According to the time (in seconds)
NUMBER: int = According to the number or cycles
"""
TIME = 0x 00
NUMBER = 0x 01
class Reader :
def __init__ (self, transport: Transport) -> None :
super (). __init__ ()
self .transport = transport
self .transport.connect()
def close (self) -> None :
self .transport.close()
def __receive_response (self) -> bytes | None :
# Get header section
response_header_section: bytes = self .transport.read_bytes( length = 5 )
if not response_header_section:
return
assert len (response_header_section) == 5
# Get body section
body_length: int = response_header_section[ - 1 ]
response_body_section: bytes = self .transport.read_bytes( length = body_length + 2 ) # 2(checksum)
return response_header_section + response_body_section
def set_power (self, power: int ) -> Response:
cmd_request: CommandRequest = CommandRequest. SET_POWER
command: Command = Command(cmd_request, data = bytearray ([power, 0x 00 ]))
# Send request
self .transport.write_bytes(command.serialize())
# Receive response
raw_response: bytes | None = self .__receive_response()
if raw_response is None :
raise RuntimeError ( "No response from reader." )
response: Response = Response(raw_response)
# Validation response
assert response.command == cmd_request
return response
def start_inventory_answer_mode (self, stop_type: StopType, value: int ) -> Iterator[Tag]:
cmd_request: CommandRequest = CommandRequest. INVENTORY_ISO_CONTINUE
data = bytearray ([stop_type.value])
data.extend(value.to_bytes( 4 , "big" ))
command = Command(cmd_request, data = data)
# Send request
self .transport.write_bytes(command.serialize())
while True :
raw_response: bytes | None = self .__receive_response()
if raw_response is None :
return
response: Response = Response(raw_response)
if response.command == CommandRequest. INVENTORY_STOP or not response.payload:
print ( "Inventory finished!" )
break
tag: Tag = Tag(
rssi = response.payload[ 0 : 2 ],
antenna = response.payload[ 2 ],
channel = response.payload[ 3 ],
data = response.payload[ 5 :])
yield tag
def stop_inventory_answer_mode (self) -> Response:
cmd_request: CommandRequest = CommandRequest. INVENTORY_STOP
command: Command = Command(cmd_request)
# Send request
self .transport.write_bytes(command.serialize())
# Receive response
response: Response = Response( self .__receive_response())
# Validation response
assert response.command == cmd_request
return response
utils.py
§
from array import array
def hex_readable (data_bytes: bytes | array, separator: str = " " ) -> str :
return separator.join( " { :02X } " .format(x) for x in data_bytes)
def ip_bytes (ip_str: str ) -> bytearray :
ip_str_split = ip_str.split( '.' )
assert len (ip_str_split) == 4
return bytearray ([ int (ip) for ip in ip_str_split])
def calculate_checksum (data: bytes ) -> bytearray :
value = 0x FFFF
for d in data:
value ^= d
for _ in range ( 8 ):
value = (value >> 1 ) ^ 0x 8408 if value & 0x 0001 else (value >> 1 )
crc_msb = value >> 0x 08
crc_lsb = value & 0x FF
return bytearray ([crc_msb, crc_lsb])
def calculate_rssi (rssi: bytes ) -> int :
return int .from_bytes(rssi, "big" , signed = True )
main.py
§
from typing import Iterator
from response import Tag
from transport import Transport, SerialTransport, BaudRate, TcpTransport
from reader import Reader, StopType
transport: Transport = SerialTransport( '/dev/ttyUSB0' , BaudRate. BPS_115200 )
# transport: Transport = TcpTransport('192.168.1.250', 2022)
reader: Reader = Reader(transport)
# 1. Inventory answer mode
tags: Iterator[Tag] = reader.start_inventory_answer_mode( stop_type = StopType. TIME , value = 3 ) # Stop after 3 seconds
for tag in tags:
print (tag)
# 2. Set power
# reader.set_power(10)
reader.close()