''' GNU LESSER GENERAL PUBLIC LICENSE Version 3, 29 June 2007 Copyright (C) 2017 - 2018 Richard.Hu Everyone is permitted to copy and distribute verbatim copies of this license document, but changing it is not allowed. This version of the GNU Lesser General Public License incorporates the terms and conditions of version 3 of the GNU General Public License, supplemented by the additional permissions listed below. 0. Additional Definitions. As used herein, "this License" refers to version 3 of the GNU Lesser General Public License, and the "GNU GPL" refers to version 3 of the GNU General Public License. "The Library" refers to a covered work governed by this License, other than an Application or a Combined Work as defined below. An "Application" is any work that makes use of an interface provided by the Library, but which is not otherwise based on the Library. Defining a subclass of a class defined by the Library is deemed a mode of using an interface provided by the Library. A "Combined Work" is a work produced by combining or linking an Application with the Library. The particular version of the Library with which the Combined Work was made is also called the "Linked Version". The "Minimal Corresponding Source" for a Combined Work means the Corresponding Source for the Combined Work, excluding any source code for portions of the Combined Work that, considered in isolation, are based on the Application, and not on the Linked Version. The "Corresponding Application Code" for a Combined Work means the object code and/or source code for the Application, including any data and utility programs needed for reproducing the Combined Work from the Application, but excluding the System Libraries of the Combined Work. 1. Exception to Section 3 of the GNU GPL. You may convey a covered work under sections 3 and 4 of this License without being bound by section 3 of the GNU GPL. 2. Conveying Modified Versions. If you modify a copy of the Library, and, in your modifications, a facility refers to a function or data to be supplied by an Application that uses the facility (other than as an argument passed when the facility is invoked), then you may convey a copy of the modified version: a) under this License, provided that you make a good faith effort to ensure that, in the event an Application does not supply the function or data, the facility still operates, and performs whatever part of its purpose remains meaningful, or b) under the GNU GPL, with none of the additional permissions of this License applicable to that copy. 3. Object Code Incorporating Material from Library Header Files. The object code form of an Application may incorporate material from a header file that is part of the Library. You may convey such object code under terms of your choice, provided that, if the incorporated material is not limited to numerical parameters, data structure layouts and accessors, or small macros, inline functions and templates (ten or fewer lines in length), you do both of the following: a) Give prominent notice with each copy of the object code that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the object code with a copy of the GNU GPL and this license document. 4. Combined Works. You may convey a Combined Work under terms of your choice that, taken together, effectively do not restrict modification of the portions of the Library contained in the Combined Work and reverse engineering for debugging such modifications, if you also do each of the following: a) Give prominent notice with each copy of the Combined Work that the Library is used in it and that the Library and its use are covered by this License. b) Accompany the Combined Work with a copy of the GNU GPL and this license document. c) For a Combined Work that displays copyright notices during execution, include the copyright notice for the Library among these notices, as well as a reference directing the user to the copies of the GNU GPL and this license document. d) Do one of the following: 0) Convey the Minimal Corresponding Source under the terms of this License, and the Corresponding Application Code in a form suitable for, and under terms that permit, the user to recombine or relink the Application with a modified version of the Linked Version to produce a modified Combined Work, in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source. 1) Use a suitable shared library mechanism for linking with the Library. A suitable mechanism is one that (a) uses at run time a copy of the Library already present on the user's computer system, and (b) will operate properly with a modified version of the Library that is interface-compatible with the Linked Version. e) Provide Installation Information, but only if you would otherwise be required to provide such information under section 6 of the GNU GPL, and only to the extent that such information is necessary to install and execute a modified version of the Combined Work produced by recombining or relinking the Application with a modified version of the Linked Version. (If you use option 4d0, the Installation Information must accompany the Minimal Corresponding Source and Corresponding Application Code. If you use option 4d1, you must provide the Installation Information in the manner specified by section 6 of the GNU GPL for conveying Corresponding Source.) 5. Combined Libraries. You may place library facilities that are a work based on the Library side by side in a single library together with other library facilities that are not Applications and are not covered by this License, and convey such a combined library under terms of your choice, if you do both of the following: a) Accompany the combined library with a copy of the same work based on the Library, uncombined with any other library facilities, conveyed under the terms of this License. b) Give prominent notice with the combined library that part of it is a work based on the Library, and explaining where to find the accompanying uncombined form of the same work. 6. Revised Versions of the GNU Lesser General Public License. The Free Software Foundation may publish revised and/or new versions of the GNU Lesser General Public License from time to time. Such new versions will be similar in spirit to the present version, but may differ in detail to address new problems or concerns. Each version is given a distinguishing version number. If the Library as you received it specifies that a certain numbered version of the GNU Lesser General Public License "or any later version" applies to it, you have the option of following the terms and conditions either of that published version or of any later version published by the Free Software Foundation. If the Library as you received it does not specify a version number of the GNU Lesser General Public License, you may choose any version of the GNU Lesser General Public License ever published by the Free Software Foundation. If the Library as you received it specifies that a proxy can decide whether future versions of the GNU Lesser General Public License shall apply, that proxy's public statement of acceptance of any version is permanent authorization for you to choose that version for the Library. ''' import string import uuid import socket import struct import threading import gzip import datetime import random from time import sleep from enum import Enum class StringResources: '''系统的资源类''' @staticmethod def ConnectedFailed(): return "连接失败" @staticmethod def UnknownError(): return "未知错误" @staticmethod def ErrorCode(): return "错误代号" @staticmethod def TextDescription(): return "文本描述" @staticmethod def ExceptionMessage(): return "错误信息:" @staticmethod def ExceptionStackTrace(): return "错误堆栈:" @staticmethod def ExceptopnTargetSite(): return "错误方法:" @staticmethod def ExceprionCustomer(): return "用户自定义方法出错:" @staticmethod def TokenCheckFailed(): return "令牌检查错误。" @staticmethod def SuccessText(): return "Success" @staticmethod def NotSupportedDataType(): return "输入的类型不支持,请重新输入" # Modbus相关 @staticmethod def ModbusTcpFunctionCodeNotSupport(): return "不支持的功能码" @staticmethod def ModbusTcpFunctionCodeOverBound(): return "读取的数据越界" @staticmethod def ModbusTcpFunctionCodeQuantityOver(): return "读取长度超过最大值" @staticmethod def ModbusTcpFunctionCodeReadWriteException(): return "读写异常" @staticmethod def ModbusTcpReadCoilException(): return "读取线圈异常" @staticmethod def ModbusTcpWriteCoilException(): return "写入线圈异常" @staticmethod def ModbusTcpReadRegisterException(): return "读取寄存器异常" @staticmethod def ModbusTcpWriteRegisterException(): return "写入寄存器异常" @staticmethod def ModbusAddressMustMoreThanOne(): return "地址值在起始地址为1的情况下,必须大于1" @staticmethod def MelsecPleaseReferToManulDocument(): return "请查看三菱的通讯手册来查看报警的具体信息" @staticmethod def MelsecReadBitInfo(): return "读取位变量数组只能针对位软元件,如果读取字软元件,请调用Read方法" @staticmethod def OmronStatus0(): return "通讯正常" @staticmethod def OmronStatus1(): return "消息头不是FINS" @staticmethod def OmronStatus2(): return "数据长度太长" @staticmethod def OmronStatus3(): return "该命令不支持" @staticmethod def OmronStatus20(): return "超过连接上限" @staticmethod def OmronStatus21(): return "指定的节点已经处于连接中" @staticmethod def OmronStatus22(): return "尝试去连接一个受保护的网络节点,该节点还未配置到PLC中" @staticmethod def OmronStatus23(): return "当前客户端的网络节点超过正常范围" @staticmethod def OmronStatus24(): return "当前客户端的网络节点已经被使用" @staticmethod def OmronStatus25(): return "所有的网络节点已经被使用" class OperateResult: '''结果对象类,可以携带额外的数据信息''' def __init__(self, err = 0, msg = ""): self.ErrorCode = err self.Message = msg # 是否成功的标志 IsSuccess = False # 操作返回的错误消息 Message = StringResources.SuccessText() # 错误码 ErrorCode = 0 # 返回显示的文本 def ToMessageShowString( self ): '''获取错误代号及文本描述''' return StringResources.ErrorCode() + ":" + str(self.ErrorCode) + "\r\n" + StringResources.TextDescription() + ":" + self.Message def CopyErrorFromOther(self, result): '''从另一个结果类中拷贝错误信息''' if result != None: self.ErrorCode = result.ErrorCode self.Message = result.Message @staticmethod def CreateFailedResult( result ): '''创建一个失败的结果对象''' failed = OperateResult() failed.ErrorCode = result.ErrorCode failed.Message = result.Message return failed @staticmethod def CreateSuccessResult(Content1=None,Content2=None,Content3=None,Content4=None,Content5=None,Content6=None,Content7=None,Content8=None,Content9=None,Content10=None): '''创建一个成功的对象''' success = OperateResult() success.IsSuccess = True success.Message = StringResources.SuccessText() if(Content2 == None and Content3 == None and Content4 == None and Content5 == None and Content6 == None and Content7 == None and Content8 == None and Content9 == None and Content10 == None) : success.Content = Content1 else: success.Content1 = Content1 success.Content2 = Content2 success.Content3 = Content3 success.Content4 = Content4 success.Content5 = Content5 success.Content6 = Content6 success.Content7 = Content7 success.Content8 = Content8 success.Content9 = Content9 success.Content10 = Content10 return success class SoftIncrementCount: '''一个简单的不持久化的序号自增类,采用线程安全实现,并允许指定最大数字,到达后清空从指定数开始''' start = 0 current = 0 maxValue = 100000000000000000000000000 hybirdLock = threading.Lock() def __init__(self, maxValue, start): '''实例化一个自增信息的对象,包括最大值''' self.maxValue = maxValue self.start = start def GetCurrentValue( self ): '''获取自增信息''' value = 0 self.hybirdLock.acquire() value = self.current self.current = self.current + 1 if self.current > self.maxValue: self.current = 0 self.hybirdLock.release() return value class INetMessage: '''数据消息的基本基类''' def ProtocolHeadBytesLength(self): '''协议头数据长度,也即是第一次接收的数据长度''' return 0 def GetContentLengthByHeadBytes(self): '''二次接收的数据长度''' return 0 def CheckHeadBytesLegal(self,toke): '''令牌检查是否成功''' return False def GetHeadBytesIdentity(self): '''获取头子节里的消息标识''' return 0 HeadBytes = bytes(0) ContentBytes = bytes(0) SendBytes = bytes(0) class S7Message (INetMessage): '''西门子s7协议的消息接收规则''' def ProtocolHeadBytesLength(self): '''协议头数据长度,也即是第一次接收的数据长度''' return 4 def GetContentLengthByHeadBytes(self): '''二次接收的数据长度''' if self.HeadBytes != None: return self.HeadBytes[2]*256 + self.HeadBytes[3]-4 else: return 0 def CheckHeadBytesLegal(self,token): '''令牌检查是否成功''' if self.HeadBytes != None: if self.HeadBytes[0] == 0x03 and self.HeadBytes[1] == 0x00: return True else: return False else: return False class MelsecA1EBinaryMessage(INetMessage): '''三菱的A兼容1E帧协议解析规则''' def ProtocolHeadBytesLength(self): '''协议头数据长度,也即是第一次接收的数据长度''' return 2 def GetContentLengthByHeadBytes(self): '''二次接收的数据长度''' contentLength = 0 if self.HeadBytes[1] == 0x5B: contentLength = 2 else: length = 0 if self.SendBytes[10] % 2 == 0: length = self.SendBytes[10] else: length = self.SendBytes[10] + 1 if self.HeadBytes[0] == 0x80: contentLength = int(length / 2) elif self.HeadBytes[0] == 0x81: contentLength = self.SendBytes[10] * 2 elif self.HeadBytes[0] == 0x82: contentLength = 0 elif self.HeadBytes[0] == 0x83: contentLength = 0 # 在A兼容1E协议中,写入值后,若不发生异常,只返回副标题 + 结束代码(0x00) # 这已经在协议头部读取过了,后面要读取的长度为0(contentLength=0) return contentLength def CheckHeadBytesLegal(self,token): '''令牌检查是否成功''' if self.HeadBytes != None: if self.HeadBytes[0] - self.SendBytes[0] == 0x80: return True else: return False else: return False class MelsecQnA3EBinaryMessage(INetMessage): '''三菱的Qna兼容3E帧协议解析规则''' def ProtocolHeadBytesLength(self): '''协议头数据长度,也即是第一次接收的数据长度''' return 9 def GetContentLengthByHeadBytes(self): '''二次接收的数据长度''' if self.HeadBytes != None: return self.HeadBytes[8] * 256 + self.HeadBytes[7] else: return 0 def CheckHeadBytesLegal(self,token): '''令牌检查是否成功''' if self.HeadBytes != None: if self.HeadBytes[0] == 0xD0 and self.HeadBytes[1] == 0x00: return True else: return False else: return False class MelsecQnA3EAsciiMessage(INetMessage): '''三菱的Qna兼容3E帧的ASCII协议解析规则''' def ProtocolHeadBytesLength(self): '''协议头数据长度,也即是第一次接收的数据长度''' return 18 def GetContentLengthByHeadBytes(self): '''二次接收的数据长度''' if self.HeadBytes != None: return int(self.HeadBytes[14:18].decode('ascii'),16) else: return 0 def CheckHeadBytesLegal(self,token): '''令牌检查是否成功''' if self.HeadBytes != None: if self.HeadBytes[0] == ord('D') and self.HeadBytes[1] == ord('0') and self.HeadBytes[2] == ord('0') and self.HeadBytes[3] == ord('0'): return True else: return False else: return False class ModbusTcpMessage (INetMessage): '''Modbus-Tcp协议的信息''' def ProtocolHeadBytesLength(self): '''协议头数据长度,也即是第一次接收的数据长度''' return 6 def GetContentLengthByHeadBytes(self): '''二次接收的数据长度''' if self.HeadBytes != None: return self.HeadBytes[4] * 256 + self.HeadBytes[5] else: return 0 def CheckHeadBytesLegal(self,token): '''令牌检查是否成功''' return True def GetHeadBytesIdentity(self): '''获取头子节里的消息标识''' return self.HeadBytes[0] * 256 + self.HeadBytes[1] class HslMessage (INetMessage): '''本组件系统使用的默认的消息规则,说明解析和反解析规则的''' def ProtocolHeadBytesLength(self): '''协议头数据长度,也即是第一次接收的数据长度''' return 32 def GetContentLengthByHeadBytes(self): '''二次接收的数据长度''' if self.HeadBytes != None: buffer = bytearray(4) buffer[0:4] = self.HeadBytes[28:32] return struct.unpack('h',data)[0] def TransUInt16(self, buffer, index ): '''从缓存中提取ushort结果''' data = self.TransByteArray(buffer,index,2) return struct.unpack('>H',data)[0] def TransInt32(self, buffer, index ): '''从缓存中提取int结果''' data = self.TransByteArray(buffer,index,4) return struct.unpack('>i',data)[0] def TransUInt32(self, buffer, index ): '''从缓存中提取uint结果''' data = self.TransByteArray(buffer,index,4) return struct.unpack('>I',data)[0] def TransInt64(self, buffer, index ): '''从缓存中提取long结果''' data = self.TransByteArray(buffer,index,8) return struct.unpack('>q',data)[0] def TransUInt64(self, buffer, index ): '''从缓存中提取ulong结果''' data = self.TransByteArray(buffer,index,8) return struct.unpack('>Q',data)[0] def TransSingle(self, buffer, index ): '''从缓存中提取float结果''' data = self.TransByteArray(buffer,index,4) return struct.unpack('>f',data)[0] def TransDouble(self, buffer, index ): '''从缓存中提取double结果''' data = self.TransByteArray(buffer,index,8) return struct.unpack('>d',data)[0] def Int16ArrayTransByte(self, values ): '''short数组变量转化缓存数据,需要传入short数组 -> bytearray''' if (values == None) : return None buffer = bytearray(len(values) * 2) for i in range(len(values)): buffer[(i*2): (i*2+2)] = struct.pack('>h',values[i]) return buffer def UInt16ArrayTransByte(self, values ): '''ushort数组变量转化缓存数据,需要传入ushort数组 -> bytearray''' if (values == None) : return None buffer = bytearray(len(values) * 2) for i in range(len(values)): buffer[(i*2): (i*2+2)] = struct.pack('>H',values[i]) return buffer def Int32ArrayTransByte(self, values ): '''int数组变量转化缓存数据,需要传入int数组 -> bytearray''' if (values == None) : return None buffer = bytearray(len(values) * 4) for i in range(len(values)): buffer[(i*4): (i*4+4)] = struct.pack('>i',values[i]) return buffer def UInt32ArrayTransByte(self, values ): '''uint数组变量转化缓存数据,需要传入uint数组 -> bytearray''' if (values == None) : return None buffer = bytearray(len(values) * 4) for i in range(len(values)): buffer[(i*4): (i*4+4)] = struct.pack('>I',values[i]) return buffer def Int64ArrayTransByte(self, values ): '''long数组变量转化缓存数据,需要传入long数组 -> bytearray''' if (values == None) : return None buffer = bytearray(len(values) * 8) for i in range(len(values)): buffer[(i*8): (i*8+8)] = struct.pack('>q',values[i]) return buffer def UInt64ArrayTransByte(self, values ): '''ulong数组变量转化缓存数据,需要传入ulong数组 -> bytearray''' if (values == None) : return None buffer = bytearray(len(values) * 8) for i in range(len(values)): buffer[(i*8): (i*8+8)] = struct.pack('>Q',values[i]) return buffer def FloatArrayTransByte(self, values ): '''float数组变量转化缓存数据,需要传入float数组 -> bytearray''' if (values == None) : return None buffer = bytearray(len(values) * 4) for i in range(len(values)): buffer[(i*4): (i*4+4)] = struct.pack('>f',values[i]) return buffer def DoubleArrayTransByte(self, values ): '''double数组变量转化缓存数据,需要传入double数组 -> bytearray''' if (values == None) : return None buffer = bytearray(len(values) * 8) for i in range(len(values)): buffer[(i*8): (i*8+8)] = struct.pack('>d',values[i]) return buffer class ReverseWordTransform(ByteTransform): '''按照字节错位的数据转换类''' def __init__(self): '''初始化方法,重新设置DataFormat''' self.DataFormat = DataFormat.ABCD IsStringReverse = False def ReverseBytesByWord( self, buffer, index, length ): '''按照字节错位的方法 -> bytearray''' if buffer == None: return None data = self.TransByteArray(buffer,index,length) for i in range(len(data)//2): data[i*2+0],data[i*2+1]= data[i*2+1],data[i*2+0] return data def ReverseAllBytesByWord( self, buffer ): '''按照字节错位的方法 -> bytearray''' return self.ReverseBytesByWord(buffer,0,len(buffer)) def TransInt16( self, buffer, index ): '''从缓存中提取short结果''' data = self.ReverseBytesByWord(buffer,index,2) return struct.unpack(' len(InBytes) * 8: length = len(InBytes) * 8 buffer = [] for i in range(length): index = i // 8 offect = i % 8 temp = 0 if offect == 0 : temp = 0x01 elif offect == 1 : temp = 0x02 elif offect == 2 : temp = 0x04 elif offect == 3 : temp = 0x08 elif offect == 4 : temp = 0x10 elif offect == 5 : temp = 0x20 elif offect == 6 : temp = 0x40 elif offect == 7 : temp = 0x80 if (InBytes[index] & temp) == temp: buffer.append(True) else: buffer.append(False) return buffer @staticmethod def BoolArrayToByte( array ): '''从bool数组变量变成byte数组''' if (array == None) : return None length = 0 if len(array) % 8 == 0: length = int(len(array) / 8) else: length = int(len(array) / 8) + 1 buffer = bytearray(length) for i in range(len(array)): index = i // 8 offect = i % 8 temp = 0 if offect == 0 : temp = 0x01 elif offect == 1 : temp = 0x02 elif offect == 2 : temp = 0x04 elif offect == 3 : temp = 0x08 elif offect == 4 : temp = 0x10 elif offect == 5 : temp = 0x20 elif offect == 6 : temp = 0x40 elif offect == 7 : temp = 0x80 if array[i] : buffer[index] += temp return buffer @staticmethod def HexStringToBytes( hex ): '''将hex字符串转化为byte数组''' return bytes.fromhex(hex) @staticmethod def BytesArrayExpandToLengthEven(array): '''扩充一个整型的数据长度为偶数个''' if len(array) % 2 == 1: array.append(0) return array @staticmethod def IsTwoBytesEquel( b1, start1, b2, start2, length ): '''判断两个字节的指定部分是否相同''' if b1 == None or b2 == None: return False for ii in range(length): if b1[ii+start1] != b2[ii+start2]: return False return True @staticmethod def TokenToBytes( token ): '''将uuid的token值转化成统一的bytes数组,方便和java,C#通讯''' buffer = bytearray(token.bytes) buffer[0],buffer[1],buffer[2],buffer[3] = buffer[3],buffer[2],buffer[1],buffer[0] buffer[4],buffer[5] = buffer[5],buffer[4] buffer[6],buffer[7] = buffer[7],buffer[6] return buffer @staticmethod def ArrayExpandToLength( value, length ): '''将数组扩充到指定的长度''' buffer = bytearray(length) if len(value) >= length: buffer[0:] = value[0:len(value)] else: buffer[0:len(value)] = value return buffer @staticmethod def ArrayExpandToLengthEven( value ): '''将数组扩充到偶数的长度''' if len(value) % 2 == 0: return value else: buffer = bytearray(len(value)+1) buffer[0:len(value)] = value return value @staticmethod def StringToUnicodeBytes( value ): '''获取字符串的unicode编码字符''' if value == None: return bytearray(0) buffer = value.encode('utf-16') if len(buffer) > 1 and buffer[0] == 255 and buffer[1] == 254: buffer = buffer[2:len(buffer)] return buffer @staticmethod def GetUniqueStringByGuidAndRandom(): '''获取一串唯一的随机字符串,长度为20,由Guid码和4位数的随机数组成,保证字符串的唯一性''' return SoftBasic.ByteToHexString(SoftBasic.TokenToBytes(uuid.uuid1()), None) + str(random.randint(12, 20)) class HslSecurity: @staticmethod def ByteEncrypt( enBytes ): '''加密方法,只对当前的程序集开放''' if (enBytes == None) : return None result = bytearray(len(enBytes)) for i in range(len(enBytes)): result[i] = enBytes[i] ^ 0xB5 return result @staticmethod def ByteDecrypt( deBytes ): '''解密方法,只对当前的程序集开放''' return HslSecurity.ByteEncrypt(deBytes) class SoftZipped: '''一个负责压缩解压数据字节的类''' @staticmethod def CompressBytes( inBytes ): '''压缩字节数据''' if inBytes == None : return None return gzip.compress( inBytes ) @staticmethod def Decompress( inBytes ): '''解压字节数据''' if inBytes == None : return None return gzip.decompress( inBytes ) class HslProtocol: '''用于本程序集访问通信的暗号说明''' @staticmethod def HeadByteLength(): '''规定所有的网络传输指令头都为32字节''' return 32 @staticmethod def ProtocolBufferSize(): '''所有网络通信中的缓冲池数据信息''' return 1024 @staticmethod def ProtocolCheckSecends(): '''用于心跳程序的暗号信息''' return 1 @staticmethod def ProtocolClientQuit(): '''客户端退出消息''' return 2 @staticmethod def ProtocolClientRefuseLogin(): '''因为客户端达到上限而拒绝登录''' return 3 @staticmethod def ProtocolClientAllowLogin(): return 4 @staticmethod def ProtocolUserString(): '''说明发送的只是文本信息''' return 1001 @staticmethod def ProtocolUserBytes(): '''发送的数据就是普通的字节数组''' return 1002 @staticmethod def ProtocolUserBitmap(): '''发送的数据就是普通的图片数据''' return 1003 @staticmethod def ProtocolUserException(): '''发送的数据是一条异常的数据,字符串为异常消息''' return 1004 @staticmethod def ProtocolFileDownload(): '''请求文件下载的暗号''' return 2001 @staticmethod def ProtocolFileUpload(): '''请求文件上传的暗号''' return 2002 @staticmethod def ProtocolFileDelete(): '''请求删除文件的暗号''' return 2003 @staticmethod def ProtocolFileCheckRight(): '''文件校验成功''' return 2004 @staticmethod def ProtocolFileCheckError(): '''文件校验失败''' return 2005 @staticmethod def ProtocolFileSaveError(): '''文件保存失败''' return 2006 @staticmethod def ProtocolFileDirectoryFiles(): '''请求文件列表的暗号''' return 2007 @staticmethod def ProtocolFileDirectories(): '''请求子文件的列表暗号''' return 2008 @staticmethod def ProtocolProgressReport(): '''进度返回暗号''' return 2009 @staticmethod def ProtocolNoZipped(): '''不压缩数据字节''' return 3001 @staticmethod def ProtocolZipped(): '''压缩数据字节''' return 3002 @staticmethod def CommandBytesBase( command, customer, token, data ): '''生成终极传送指令的方法,所有的数据均通过该方法出来''' _zipped = HslProtocol.ProtocolNoZipped() buffer = None _sendLength = 0 if data == None: buffer = bytearray(HslProtocol.HeadByteLength()) else: data = HslSecurity.ByteEncrypt( data ) if len(data) > 102400: data = SoftZipped.CompressBytes( data ) _zipped = HslProtocol.ProtocolZipped() buffer = bytearray( HslProtocol.HeadByteLength() + len(data) ) _sendLength = len(data) buffer[0:4] = struct.pack( '0: buffer[32:_sendLength+32]=data return buffer @staticmethod def CommandAnalysis( head, content ): '''解析接收到数据,先解压缩后进行解密''' if content != None: _zipped = struct.unpack(' 0 : Content[0:len(read.Content1)] = read.Content1 if len(read.Content2) > 0 : Content[len(read.Content1):len(Content)] = read.Content2 return OperateResult.CreateSuccessResult( Content ) def ReadFromCoreServer( self, send ): '''使用底层的数据报文来通讯,传入需要发送的消息,返回一条完整的数据指令''' result = OperateResult( ) self.interactiveLock.acquire() # 获取有用的网络通道,如果没有,就建立新的连接 resultSocket = self.GetAvailableSocket( ) if resultSocket.IsSuccess == False: self.isSocketError = True self.interactiveLock.release() result.CopyErrorFromOther( resultSocket ) return result read = self.ReadFromCoreSocketServer( resultSocket.Content, send ) if read.IsSuccess : self.isSocketError = False result.IsSuccess = read.IsSuccess result.Content = read.Content result.Message = StringResources.SuccessText # string tmp2 = BasicFramework.SoftBasic.ByteToHexString( result.Content, '-' ) else: self.isSocketError = True result.CopyErrorFromOther( read ) self.interactiveLock.release() if self.isPersistentConn==False: if resultSocket.Content != None: resultSocket.Content.close() return result def ReadFromCoreServerBase( self, socket, send ): '''使用底层的数据报文来通讯,传入需要发送的消息,返回最终的数据结果,被拆分成了头子节和内容字节信息''' self.iNetMessage.SendBytes = send sendResult = self.Send( socket, send ) if sendResult.IsSuccess == False: if socket!= None : socket.close( ) return OperateResult.CreateFailedResult( sendResult ) # 接收超时时间大于0时才允许接收远程的数据 if (self.receiveTimeOut >= 0): # 接收数据信息 resultReceive = self.ReceiveMessage(socket, 10000, self.iNetMessage) if resultReceive.IsSuccess == False: socket.close( ) return OperateResult( msg = "Receive data timeout: " + str(self.receiveTimeOut ) + " Msg:"+ resultReceive.Message) return OperateResult.CreateSuccessResult( resultReceive.Content.HeadBytes, resultReceive.Content.ContentBytes ) else: return OperateResult.CreateSuccessResult( bytearray(0), bytearray(0) ) def GetBoolResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetBoolResultFromBytes( result, self.byteTransform) def GetByteResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetByteResultFromBytes( result, self.byteTransform) def GetInt16ResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetInt16ResultFromBytes( result, self.byteTransform) def GetUInt16ResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetUInt16ResultFromBytes( result, self.byteTransform) def GetInt32ResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetInt32ResultFromBytes( result, self.byteTransform ) def GetUInt32ResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetUInt32ResultFromBytes( result, self.byteTransform ) def GetInt64ResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetInt64ResultFromBytes( result, self.byteTransform ) def GetUInt64ResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetUInt64ResultFromBytes( result, self.byteTransform ) def GetSingleResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetSingleResultFromBytes( result, self.byteTransform ) def GetDoubleResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetDoubleResultFromBytes( result, self.byteTransform ) def GetStringResultFromBytes( self, result ): '''将指定的OperateResult类型转化''' return ByteTransformHelper.GetStringResultFromBytes( result, self.byteTransform ) class NetworkDeviceBase(NetworkDoubleBase): '''设备类的基类,提供了基础的字节读写方法''' # 单个数据字节的长度,西门子为2,三菱,欧姆龙,modbusTcp就为1 WordLength = 1 def Read( self, address, length ): '''从设备读取原始数据''' return OperateResult( ) def Write( self, address, value ): '''将原始数据写入设备''' return OperateResult() def ReadInt16( self, address, length = None ): '''读取设备的short类型的数据''' if(length == None): return self.GetInt16ResultFromBytes( self.Read( address, self.WordLength ) ) else: read = self.Read(address,length*self.WordLength) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) return OperateResult.CreateSuccessResult(self.byteTransform.TransInt16Array(read.Content,0,length)) def ReadUInt16( self, address, length = None ): '''读取设备的ushort数据类型的数据''' if length == None: return self.GetUInt16ResultFromBytes(self.Read(address,self.WordLength)) else: read = self.Read(address,length*self.WordLength) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) return OperateResult.CreateSuccessResult(self.byteTransform.TransUInt16Array(read.Content,0,length)) def ReadInt32( self, address, length = None ): '''读取设备的int类型的数据''' if length == None: return self.GetInt32ResultFromBytes( self.Read( address, 2 * self.WordLength ) ) else: read = self.Read(address,2*length*self.WordLength) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) return OperateResult.CreateSuccessResult(self.byteTransform.TransInt32Array(read.Content,0,length)) def ReadUInt32( self, address, length = None ): '''读取设备的uint数据类型的数据''' if length == None: return self.GetUInt32ResultFromBytes(self.Read(address,2 * self.WordLength)) else: read = self.Read(address,2*length*self.WordLength) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) return OperateResult.CreateSuccessResult(self.byteTransform.TransUInt32Array(read.Content,0,length)) def ReadFloat( self, address, length = None ): '''读取设备的float类型的数据''' if length == None: return self.GetSingleResultFromBytes( self.Read( address, 2 * self.WordLength ) ) else: read = self.Read(address,2*length*self.WordLength) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) return OperateResult.CreateSuccessResult(self.byteTransform.TransSingleArray(read.Content,0,length)) def ReadInt64( self, address, length = None ): '''读取设备的long类型的数组''' if length == None: return self.GetInt64ResultFromBytes( self.Read( address, 4 * self.WordLength) ) else: read = self.Read(address,4*length*self.WordLength) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) return OperateResult.CreateSuccessResult(self.byteTransform.TransInt64Array(read.Content,0,length)) def ReadUInt64( self, address, length = None ): '''读取设备的long类型的数组''' if length == None: return self.GetUInt64ResultFromBytes( self.Read( address, 4 * self.WordLength) ) else: read = self.Read(address,4*length*self.WordLength) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) return OperateResult.CreateSuccessResult(self.byteTransform.TransUInt64Array(read.Content,0,length)) def ReadDouble( self, address, length = None ): '''读取设备的long类型的数组''' if length == None: return self.GetDoubleResultFromBytes( self.Read( address, 4 * self.WordLength) ) else: read = self.Read(address,4*length*self.WordLength) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) return OperateResult.CreateSuccessResult(self.byteTransform.TransDoubleArray(read.Content,0,length)) def ReadString( self, address, length ): return self.GetStringResultFromBytes( self.Read( address, length ) ) def WriteInt16( self, address, value ): '''向设备中写入short数据或是数组,返回是否写入成功''' if type(value) == list: return self.Write( address, self.byteTransform.Int16ArrayTransByte( value ) ) else: return self.WriteInt16( address, [value] ) def WriteUInt16( self, address, value ): '''向设备中写入short数据或是数组,返回是否写入成功''' if type(value) == list: return self.Write( address, self.byteTransform.UInt16ArrayTransByte( value ) ) else: return self.WriteUInt16( address, [value] ) def WriteInt32( self, address, value ): '''向设备中写入int数据,返回是否写入成功''' if type(value) == list: return self.Write( address, self.byteTransform.Int32ArrayTransByte(value) ) else: return self.WriteInt32( address, [value]) def WriteUInt32( self, address, value): '''向设备中写入uint数据,返回是否写入成功''' if type(value) == list: return self.Write( address, self.byteTransform.UInt32ArrayTransByte(value) ) else: return self.WriteUInt32( address, [value] ) def WriteFloat( self, address, value ): '''向设备中写入float数据,返回是否写入成功''' if type(value) == list: return self.Write( address, self.byteTransform.FloatArrayTransByte(value) ) else: return self.WriteFloat(address, [value]) def WriteInt64( self, address, value ): '''向设备中写入long数据,返回是否写入成功''' if type(value) == list: return self.Write( address, self.byteTransform.Int64ArrayTransByte(value)) else: return self.WriteInt64( address, [value] ) def WriteUInt64( self, address, value ): '''向设备中写入ulong数据,返回是否写入成功''' if type(value) == list: return self.Write( address, self.byteTransform.UInt64ArrayTransByte(value)) else: return self.WriteUInt64( address, [value] ) def WriteDouble( self, address, value ): '''向设备中写入double数据,返回是否写入成功''' if type(value) == list: return self.Write( address, self.byteTransform.DoubleArrayTransByte(value) ) else: return self.WriteDouble( address, [value] ) def WriteString( self, address, value, length = None ): '''向设备中写入string数据,编码为ascii,返回是否写入成功''' if length == None: return self.Write( address, self.byteTransform.StringTransByte( value, 'ascii' ) ) else: return self.Write( address, SoftBasic.ArrayExpandToLength(self.byteTransform.StringTransByte( value, 'ascii' ), length)) def WriteUnicodeString( self, address, value, length = None): '''向设备中写入string数据,编码为unicode,返回是否写入成功''' if length == None: temp = SoftBasic.StringToUnicodeBytes(value) return self.Write( address, temp ) else: temp = SoftBasic.StringToUnicodeBytes(value) temp = SoftBasic.ArrayExpandToLength( temp, length * 2 ) return self.Write( address, temp ) class ModbusInfo: '''Modbus协议相关的一些信息''' @staticmethod def ReadCoil(): '''读取线圈功能码''' return 0x01 @staticmethod def ReadDiscrete(): '''读取寄存器功能码''' return 0x02 @staticmethod def ReadRegister(): '''读取寄存器功能码''' return 0x03 @staticmethod def ReadInputRegister(): '''读取输入寄存器''' return 0x04 @staticmethod def WriteOneCoil(): '''写单个寄存器''' return 0x05 @staticmethod def WriteOneRegister(): '''写单个寄存器''' return 0x06 @staticmethod def WriteCoil(): '''写多个线圈''' return 0x0F @staticmethod def WriteRegister(): '''写多个寄存器''' return 0x10 @staticmethod def FunctionCodeNotSupport(): '''不支持该功能码''' return 0x01 @staticmethod def FunctionCodeOverBound(): '''该地址越界''' return 0x02 @staticmethod def FunctionCodeQuantityOver(): '''读取长度超过最大值''' return 0x03 @staticmethod def FunctionCodeReadWriteException(): '''读写异常''' return 0x04 @staticmethod def PackCommandToTcp( value, id ): '''将modbus指令打包成Modbus-Tcp指令''' buffer = bytearray( len(value) + 6) buffer[0:2] = struct.pack('>H',id) buffer[4:6] = struct.pack('>H',len(value)) buffer[6:len(buffer)] = value return buffer @staticmethod def GetDescriptionByErrorCode( code ): '''通过错误码来获取到对应的文本消息''' if code == 0x01: return StringResources.ModbusTcpFunctionCodeNotSupport() elif code == 0x02: return StringResources.ModbusTcpFunctionCodeOverBound() elif code == 0x03: return StringResources.ModbusTcpFunctionCodeQuantityOver() elif code == 0x04: return StringResources.ModbusTcpFunctionCodeReadWriteException() else: return StringResources.UnknownError @staticmethod def AnalysisReadAddress( address, isStartWithZero ): '''分析Modbus协议的地址信息,该地址适应于tcp及rtu模式''' try: mAddress = ModbusAddress(address) if isStartWithZero == False: if mAddress.Address < 1: raise RuntimeError(StringResources.ModbusAddressMustMoreThanOne()) else: mAddress.Address = mAddress.Address - 1 return OperateResult.CreateSuccessResult(mAddress) except Exception as ex: return OperateResult( msg = str(ex)) class ModbusAddress(DeviceAddressBase): '''Modbus协议的地址类''' Station = 0 Function = ModbusInfo.ReadRegister() def __init__(self, address = "0"): self.Station = -1 self.Function = ModbusInfo.ReadRegister() self.Address = 0 self.AnalysisAddress(address) def AnalysisAddress( self, address = "0" ): '''解析Modbus的地址码''' if address.find(';')>=0: listAddress = address.split(";") for index in range(len(listAddress)): if listAddress[index][0] == 's' or listAddress[index][0] == 'S': self.Station = int(listAddress[index][2:]) elif listAddress[index][0] == 'x' or listAddress[index][0] == 'X': self.Function = int(listAddress[index][2:]) else: self.Address = int(listAddress[index]) else: self.Address = int(address) def CreateReadCoils( self, station, length ): '''创建一个读取线圈的字节对象''' buffer = bytearray(6) if self.Station < 0 : buffer[0] = station else: buffer[0] = self.Station buffer[1] = ModbusInfo.ReadCoil() buffer[2:4] = struct.pack('>H', self.Address) buffer[4:6] = struct.pack('>H', length) return buffer def CreateReadDiscrete( self, station, length ): '''创建一个读取离散输入的字节对象''' buffer = bytearray(6) if self.Station < 0 : buffer[0] = station else: buffer[0] = self.Station buffer[1] = ModbusInfo.ReadDiscrete() buffer[2:4] = struct.pack('>H', self.Address) buffer[4:6] = struct.pack('>H', length) return buffer def CreateReadRegister( self, station, length ): '''创建一个读取寄存器的字节对象''' buffer = bytearray(6) if self.Station < 0 : buffer[0] = station else: buffer[0] = self.Station buffer[1] = self.Function buffer[2:4] = struct.pack('>H', self.Address) buffer[4:6] = struct.pack('>H', length) return buffer def CreateReadInputRegister( self, station, length ): '''创建一个读取寄存器的字节对象''' buffer = bytearray(6) if self.Station < 0 : buffer[0] = station else: buffer[0] = self.Station buffer[1] = ModbusInfo.ReadInputRegister() buffer[2:4] = struct.pack('>H', self.Address) buffer[4:6] = struct.pack('>H', length) return buffer def CreateWriteOneCoil(self, station, value): '''创建一个写入单个线圈的指令''' buffer = bytearray(6) if self.Station < 0 : buffer[0] = station else: buffer[0] = self.Station buffer[1] = ModbusInfo.WriteOneCoil() buffer[2:4] = struct.pack('>H', self.Address) if value == True: buffer[4] = 0xFF return buffer def CreateWriteOneRegister(self, station, values): '''创建一个写入单个寄存器的指令''' buffer = bytearray(6) if self.Station < 0 : buffer[0] = station else: buffer[0] = self.Station buffer[1] = ModbusInfo.WriteOneRegister() buffer[2:4] = struct.pack('>H', self.Address) buffer[4:6] = values return buffer def CreateWriteCoil(self, station, values): '''创建一个写入批量线圈的指令''' data = SoftBasic.BoolArrayToByte( values ) buffer = bytearray(7 + len(data)) if self.Station < 0 : buffer[0] = station else: buffer[0] = self.Station buffer[1] = ModbusInfo.WriteCoil() buffer[2:4] = struct.pack('>H', self.Address) buffer[4:6] = struct.pack('>H', len(values)) buffer[6] = len(data) buffer[7:len(buffer)] = data return buffer def CreateWriteRegister(self, station, values): '''创建一个写入批量寄存器的指令''' buffer = bytearray(7 + len(values)) if self.Station < 0 : buffer[0] = station else: buffer[0] = self.Station buffer[1] = ModbusInfo.WriteRegister() buffer[2:4] = struct.pack('>H', self.Address) buffer[4:6] = struct.pack('>H', len(values)//2) buffer[6] = len(values) buffer[7:len(buffer)] = values return buffer def AddressAdd(self, value): '''地址新增指定的数''' modbusAddress = ModbusAddress() modbusAddress.Station = self.Station modbusAddress.Function = self.Function modbusAddress.Address = self.Address+value return modbusAddress class ModbusTcpNet(NetworkDeviceBase): '''Modbus-Tcp协议的客户端通讯类,方便的和服务器进行数据交互''' station = 1 softIncrementCount = None isAddressStartWithZero = True def __init__(self, ipAddress = '127.0.0.1', port = 502, station = 1): '''实例化一个MOdbus-Tcp协议的客户端对象''' self.WordLength = 1 self.softIncrementCount = SoftIncrementCount( 65536, 0 ) self.station = station self.ipAddress = ipAddress self.port = port self.byteTransform = ReverseWordTransform() self.iNetMessage = ModbusTcpMessage() def SetDataFormat( self, value ): '''多字节的数据是否高低位反转,该设置的改变会影响Int32,UInt32,float,double,Int64,UInt64类型的读写''' self.byteTransform.DataFormat = value def GetDataFormat( self ): '''多字节的数据是否高低位反转,该设置的改变会影响Int32,UInt32,float,double,Int64,UInt64类型的读写''' return self.byteTransform.DataFormat def SetIsStringReverse( self, value ): '''字符串数据是否按照字来反转''' self.byteTransform.IsStringReverse = value def GetIsStringReverse( self ): '''字符串数据是否按照字来反转''' return self.byteTransform.IsStringReverse def BuildReadCoilCommand(self, address, length): '''生成一个读取线圈的指令头''' # 分析地址 analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False: return OperateResult.CreateFailedResult(analysis) # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() #生成最终的指令 buffer = ModbusInfo.PackCommandToTcp(analysis.Content.CreateReadCoils( self.station, length ), messageId) return OperateResult.CreateSuccessResult(buffer) def BuildReadDiscreteCommand(self, address, length): '''生成一个读取离散信息的指令头''' # 分析地址 analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False: return OperateResult.CreateFailedResult(analysis) # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() buffer = ModbusInfo.PackCommandToTcp(analysis.Content.CreateReadDiscrete(self.station,length), messageId) return OperateResult.CreateSuccessResult(buffer) def BuildReadRegisterCommand(self, address, length): '''创建一个读取寄存器的字节对象''' analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False: return OperateResult.CreateFailedResult(analysis) # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() buffer = ModbusInfo.PackCommandToTcp(analysis.Content.CreateReadRegister(self.station,length), messageId) return OperateResult.CreateSuccessResult(buffer) def BuildReadInputRegisterCommand(self, address, length): '''创建一个读取寄存器的字节对象''' analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False: return OperateResult.CreateFailedResult(analysis) # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() buffer = ModbusInfo.PackCommandToTcp(analysis.Content.CreateReadInputRegister(self.station,length), messageId) return OperateResult.CreateSuccessResult(buffer) def BuildWriteOneCoilCommand(self, address,value): '''生成一个写入单线圈的指令头''' analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False: return OperateResult.CreateFailedResult(analysis) # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() buffer = ModbusInfo.PackCommandToTcp(analysis.Content.CreateWriteOneCoil(self.station,value), messageId) return OperateResult.CreateSuccessResult(buffer) def BuildWriteOneRegisterCommand(self, address, values): '''生成一个写入单个寄存器的报文''' analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False: return OperateResult.CreateFailedResult(analysis) # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() buffer = ModbusInfo.PackCommandToTcp(analysis.Content.CreateWriteOneRegister(self.station,values), messageId) return OperateResult.CreateSuccessResult(buffer) def BuildWriteCoilCommand(self, address, values): '''生成批量写入单个线圈的报文信息,需要传入bool数组信息''' analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False: return OperateResult.CreateFailedResult(analysis) # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() buffer = ModbusInfo.PackCommandToTcp(analysis.Content.CreateWriteCoil(self.station,values), messageId) return OperateResult.CreateSuccessResult(buffer) def BuildWriteRegisterCommand(self, address, values): '''生成批量写入寄存器的报文信息,需要传入byte数组''' analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False: return OperateResult.CreateFailedResult(analysis) # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() buffer = ModbusInfo.PackCommandToTcp(analysis.Content.CreateWriteRegister(self.station,values), messageId) return OperateResult.CreateSuccessResult(buffer) def BuildReadModbusAddressCommand( self, address, length ): '''生成一个读取寄存器的指令头,address->ModbusAddress''' # 获取消息号 messageId = self.softIncrementCount.GetCurrentValue() # 生成最终tcp指令 buffer = ModbusInfo.PackCommandToTcp( address.CreateReadRegister( self.station, length ), messageId ) #print(buffer) return OperateResult.CreateSuccessResult( buffer ) def CheckModbusTcpResponse( self, send ): '''检查当前的Modbus-Tcp响应是否是正确的''' resultBytes = self.ReadFromCoreServer( send ) if resultBytes.IsSuccess == True: if (send[7] + 0x80) == resultBytes.Content[7]: # 发生了错误 resultBytes.IsSuccess = False resultBytes.Message = ModbusInfo.GetDescriptionByErrorCode( resultBytes.Content[8] ) resultBytes.ErrorCode = resultBytes.Content[8] return resultBytes def ReadModBusBase( self, code, address, length ): '''检查当前的Modbus-Tcp响应是否是正确的''' command = None if code == ModbusInfo.ReadCoil(): command = self.BuildReadCoilCommand( address, length ) elif code == ModbusInfo.ReadDiscrete(): command = self.BuildReadDiscreteCommand( address, length ) elif code == ModbusInfo.ReadRegister(): command = self.BuildReadRegisterCommand( address, length ) elif code == ModbusInfo.ReadInputRegister(): command = self.BuildReadInputRegisterCommand( address, length ) else: command = OperateResult( msg = StringResources.ModbusTcpFunctionCodeNotSupport() ) if command.IsSuccess == False : return OperateResult.CreateFailedResult( command ) resultBytes = self.CheckModbusTcpResponse( command.Content ) if resultBytes.IsSuccess == True: # 二次数据处理 if len(resultBytes.Content) >= 9: buffer = bytearray(len(resultBytes.Content) - 9) buffer[0:len(buffer)] = resultBytes.Content[9:] resultBytes.Content = buffer return resultBytes def ReadModBusAddressBase( self, address, length = 1 ): '''读取服务器的数据,需要指定不同的功能码''' command = self.BuildReadModbusAddressCommand( address, length ) if command.IsSuccess == False: return OperateResult.CreateFailedResult(command) resultBytes = self.CheckModbusTcpResponse( command.Content ) if resultBytes.IsSuccess == True: # 二次数据处理 if len(resultBytes.Content) >= 9: buffer = bytearray(len(resultBytes.Content) - 9) buffer[0:len(buffer)] = resultBytes.Content[9:] resultBytes.Content = buffer return resultBytes def ReadCoil( self, address, length = None): '''批量的读取线圈,需要指定起始地址,读取长度可选''' if length == None: read = self.ReadCoil( address, 1 ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) return OperateResult.CreateSuccessResult( read.Content[0] ) else: read = self.ReadModBusBase( ModbusInfo.ReadCoil(), address, length ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) return OperateResult.CreateSuccessResult( SoftBasic.ByteToBoolArray( read.Content, length ) ) def ReadDiscrete( self, address, length = None): '''批量的读取输入点,需要指定起始地址,可选读取长度''' if length == None: read = self.ReadDiscrete( address, 1 ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) return OperateResult.CreateSuccessResult( read.Content[0] ) else: read = self.ReadModBusBase( ModbusInfo.ReadDiscrete(), address, length ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) return OperateResult.CreateSuccessResult( SoftBasic.ByteToBoolArray( read.Content, length ) ) def Read( self, address, length ): '''从Modbus服务器批量读取寄存器的信息,需要指定起始地址,读取长度''' analysis = ModbusInfo.AnalysisReadAddress( address, self.isAddressStartWithZero ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) return self.ReadModBusAddressBase( analysis.Content, length ) def WriteOneRegister( self, address, value ): '''写一个寄存器数据''' if type(value) == list: command = self.BuildWriteOneRegisterCommand( address, value ) if command.IsSuccess == False : return command return self.CheckModbusTcpResponse( command.Content ) else: return self.WriteOneRegister(address, struct.pack('>H', value)) def Write( self, address, value ): '''将数据写入到Modbus的寄存器上去,需要指定起始地址和数据内容''' command = self.BuildWriteRegisterCommand( address, value ) if command.IsSuccess == False: return command return self.CheckModbusTcpResponse( command.Content ) def WriteCoil( self, address, value ): '''批量写线圈信息,指定是否通断''' if type(value) == list: command = self.BuildWriteCoilCommand( address, value ) if command.IsSuccess == False : return command return self.CheckModbusTcpResponse( command.Content ) else: command = self.BuildWriteOneCoilCommand( address, value ) if command.IsSuccess == False : return command return self.CheckModbusTcpResponse( command.Content ) def WriteBool( self, address, values ): '''批量写寄存器的数据内容''' return self.Write( address, SoftBasic.BoolArrayToByte( values ) ) # 三菱的类库 class MelsecA1EDataType: '''三菱PLC的数据类型,此处包含了几个常用的类型''' DataCode = bytearray(2) DataType = 0 AsciiCode = 0 FromBase = 0 def __init__(self, code0, code1, typeCode, asciiCode, fromBase): '''如果您清楚类型代号,可以根据值进行扩展''' self.DataCode[0] = code0 self.DataCode[1] = code1 self.AsciiCode = asciiCode self.FromBase = fromBase if typeCode < 2: self.DataType = typeCode @staticmethod def GetX(): '''X输入寄存器''' return MelsecA1EDataType(0x58,0x20,0x01,'X*',8) @staticmethod def GetY(): '''Y输出寄存器''' return MelsecA1EDataType(0x59,0x20,0x01,'Y*',8) @staticmethod def GetM(): '''M中间寄存器''' return MelsecA1EDataType(0x4D,0x20,0x01,'M*',10) @staticmethod def GetS(): '''S状态寄存器''' return MelsecA1EDataType(0x53,0x20,0x01,'S*',10) @staticmethod def GetD(): '''D数据寄存器''' return MelsecA1EDataType(0x44,0x20,0x00,'D*',10) @staticmethod def GetR(): '''R文件寄存器''' return MelsecA1EDataType(0x52,0x20,0x00,'R*',10) class MelsecMcDataType: '''三菱PLC的数据类型,此处包含了几个常用的类型''' DataCode = 0 DataType = 0 AsciiCode = 0 FromBase = 0 def __init__(self, code, typeCode, asciiCode, fromBase): '''如果您清楚类型代号,可以根据值进行扩展''' self.DataCode = code self.AsciiCode = asciiCode self.FromBase = fromBase if typeCode < 2: self.DataType = typeCode @staticmethod def GetX(): '''X输入寄存器''' return MelsecMcDataType(0x9C,0x01,'X*',16) @staticmethod def GetY(): '''Y输出寄存器''' return MelsecMcDataType(0x9D,0x01,'Y*',16) @staticmethod def GetM(): '''M中间寄存器''' return MelsecMcDataType(0x90,0x01,'M*',10) @staticmethod def GetD(): '''D数据寄存器''' return MelsecMcDataType(0xA8,0x00,'D*',10) @staticmethod def GetW(): '''W链接寄存器''' return MelsecMcDataType(0xB4,0x00,'W*',16) @staticmethod def GetL(): '''L锁存继电器''' return MelsecMcDataType(0x92,0x01,'L*',10) @staticmethod def GetF(): '''F报警器''' return MelsecMcDataType(0x93,0x01,'F*',10) @staticmethod def GetV(): '''V边沿继电器''' return MelsecMcDataType(0x93,0x01,'V*',10) @staticmethod def GetB(): '''B链接继电器''' return MelsecMcDataType(0xA,0x01,'B*',16) @staticmethod def GetR(): '''R文件寄存器''' return MelsecMcDataType(0xAF,0x00,'R*',10) @staticmethod def GetS(): '''S步进继电器''' return MelsecMcDataType(0x98,0x01,'S*',10) @staticmethod def GetZ(): '''变址寄存器''' return MelsecMcDataType(0xCC,0x00,'Z*',10) @staticmethod def GetT(): '''定时器的值''' return MelsecMcDataType(0xC2,0x00,'TN',10) @staticmethod def GetC(): '''计数器的值''' return MelsecMcDataType(0xC5,0x00,'CN',10) class MelsecHelper: '''所有三菱通讯类的通用辅助工具类,包含了一些通用的静态方法,可以使用本类来获取一些原始的报文信息。详细的操作参见例子''' @staticmethod def McA1EAnalysisAddress( address = "0" ): result = OperateResult() try: if address.startswith("X") or address.startswith("x"): result.Content1 = MelsecA1EDataType.GetX() result.Content2 = int(address[1:], MelsecA1EDataType.GetX().FromBase) elif address.startswith("Y") or address.startswith("y"): result.Content1 = MelsecA1EDataType.GetY() result.Content2 = int(address[1:], MelsecA1EDataType.GetY().FromBase) elif address.startswith("M") or address.startswith("m"): result.Content1 = MelsecA1EDataType.GetM() result.Content2 = int(address[1:], MelsecA1EDataType.GetM().FromBase) elif address.startswith("S") or address.startswith("s"): result.Content1 = MelsecA1EDataType.GetS() result.Content2 = int(address[1:], MelsecA1EDataType.GetS().FromBase) elif address.startswith("D") or address.startswith("d"): result.Content1 = MelsecA1EDataType.GetD() result.Content2 = int(address[1:], MelsecA1EDataType.GetD().FromBase) elif address.startswith("R") or address.startswith("r"): result.Content1 = MelsecA1EDataType.GetR() result.Content2 = int(address[1:], MelsecA1EDataType.GetR().FromBase) else: raise Exception("type not supported!") except Exception as ex: result.Message = str(ex) return result result.IsSuccess = True result.Message = StringResources.SuccessText() return result @staticmethod def McAnalysisAddress( address = "0" ): result = OperateResult() try: if address.startswith("M") or address.startswith("m"): result.Content1 = MelsecMcDataType.GetM() result.Content2 = int(address[1:], MelsecMcDataType.GetM().FromBase) elif address.startswith("X") or address.startswith("x"): result.Content1 = MelsecMcDataType.GetX() result.Content2 = int(address[1:], MelsecMcDataType.GetX().FromBase) elif address.startswith("Y") or address.startswith("y"): result.Content1 = MelsecMcDataType.GetY() result.Content2 = int(address[1:], MelsecMcDataType.GetY().FromBase) elif address.startswith("D") or address.startswith("d"): result.Content1 = MelsecMcDataType.GetD() result.Content2 = int(address[1:], MelsecMcDataType.GetD().FromBase) elif address.startswith("W") or address.startswith("w"): result.Content1 = MelsecMcDataType.GetW() result.Content2 = int(address[1:], MelsecMcDataType.GetW().FromBase) elif address.startswith("L") or address.startswith("l"): result.Content1 = MelsecMcDataType.GetL() result.Content2 = int(address[1:], MelsecMcDataType.GetL().FromBase) elif address.startswith("F") or address.startswith("f"): result.Content1 = MelsecMcDataType.GetF() result.Content2 = int(address[1:], MelsecMcDataType.GetF().FromBase) elif address.startswith("V") or address.startswith("v"): result.Content1 = MelsecMcDataType.GetV() result.Content2 = int(address[1:], MelsecMcDataType.GetV().FromBase) elif address.startswith("B") or address.startswith("b"): result.Content1 = MelsecMcDataType.GetB() result.Content2 = int(address[1:], MelsecMcDataType.GetB().FromBase) elif address.startswith("R") or address.startswith("r"): result.Content1 = MelsecMcDataType.GetR() result.Content2 = int(address[1:], MelsecMcDataType.GetR().FromBase) elif address.startswith("S") or address.startswith("s"): result.Content1 = MelsecMcDataType.GetS() result.Content2 = int(address[1:], MelsecMcDataType.GetS().FromBase) elif address.startswith("Z") or address.startswith("z"): result.Content1 = MelsecMcDataType.GetZ() result.Content2 = int(address[1:], MelsecMcDataType.GetZ().FromBase) elif address.startswith("T") or address.startswith("t"): result.Content1 = MelsecMcDataType.GetT() result.Content2 = int(address[1:], MelsecMcDataType.GetT().FromBase) elif address.startswith("C") or address.startswith("c"): result.Content1 = MelsecMcDataType.GetC() result.Content2 = int(address[1:], MelsecMcDataType.GetC().FromBase) else: raise Exception("type not supported!") except Exception as ex: result.Message = str(ex) return result result.IsSuccess = True result.Message = StringResources.SuccessText() return result @staticmethod def BuildBytesFromData( value, length = None ): '''从数据构建一个ASCII格式地址字节''' if length == None: return ('{:02X}'.format(value)).encode('ascii') else: return (('{:0'+ str(length) +'X}').format(value)).encode('ascii') @staticmethod def BuildBytesFromAddress( address, dataType ): '''从三菱的地址中构建MC协议的6字节的ASCII格式的地址''' if dataType.FromBase == 10: return ('{:06d}'.format(address)).encode('ascii') else: return ('{:06X}'.format(address)).encode('ascii') @staticmethod def FxCalculateCRC( data ): '''计算Fx协议指令的和校验信息''' sum = 0 index = 1 while index < (len(data) - 2): sum += data[index] index=index+1 return MelsecHelper.BuildBytesFromData( sum ) @staticmethod def CheckCRC( data ): '''检查指定的和校验是否是正确的''' crc = MelsecHelper.FxCalculateCRC( data ) if (crc[0] != data[data.Length - 2]) : return False if (crc[1] != data[data.Length - 1]) : return False return True class MelsecA1ENet(NetworkDeviceBase): '''三菱PLC通讯协议,采用A兼容1E帧协议实现,使用二进制码通讯,请根据实际型号来进行选取''' PLCNumber = 0xFF def __init__(self,ipAddress= "127.0.0.1",port = 0): '''实例化一个三菱的A兼容1E帧协议的通讯对象''' self.iNetMessage = MelsecA1EBinaryMessage() self.byteTransform = RegularByteTransform() self.ipAddress = ipAddress self.port = port self.WordLength = 1 @staticmethod def BuildReadCommand(address,length,plcNumber): '''根据类型地址长度确认需要读取的指令头''' analysis = MelsecHelper.McA1EAnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) subtitle = 0 if analysis.Content1.DataType == 0x01: subtitle = 0x00 else: subtitle = 0x01 _PLCCommand = bytearray(12) _PLCCommand[0] = subtitle # 副标题 _PLCCommand[1] = plcNumber # PLC编号 _PLCCommand[2] = 0x0A # CPU监视定时器(L)这里设置为0x00,0x0A,等待CPU返回的时间为10*250ms=2.5秒 _PLCCommand[3] = 0x00 # CPU监视定时器(H) _PLCCommand[4] = analysis.Content2 % 256 # 起始软元件(开始读取的地址) _PLCCommand[5] = analysis.Content2 // 256 _PLCCommand[6] = 0x00 _PLCCommand[7] = 0x00 _PLCCommand[8] = analysis.Content1.DataCode[1] # 软元件代码(L) _PLCCommand[9] = analysis.Content1.DataCode[0] # 软元件代码(H) _PLCCommand[10] = length % 256 # 软元件点数 _PLCCommand[11] = 0x00 return OperateResult.CreateSuccessResult( _PLCCommand ) @staticmethod def BuildWriteCommand( address,value,plcNumber): '''根据类型地址以及需要写入的数据来生成指令头''' analysis = MelsecHelper.McA1EAnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) length = -1 if analysis.Content1.DataType == 1: # 按照位写入的操作,数据需要重新计算 length2 = len(value) // 2 + 1 if len(value) % 2 == 0 : length2 = len(value) // 2 buffer = bytearray(length2) for i in range(length2): if value[i * 2 + 0] != 0x00 : buffer[i] += 0x10 if (i * 2 + 1) < len(value) : if value[i * 2 + 1] != 0x00 : buffer[i] += 0x01 length = len(value) value = buffer subtitle = 0 if analysis.Content1.DataType == 0x01: subtitle = 0x02 else: subtitle = 0x03 _PLCCommand = bytearray(12 + len(value)) _PLCCommand[0] = subtitle # 副标题 _PLCCommand[1] = plcNumber # PLC编号 _PLCCommand[2] = 0x0A # CPU监视定时器(L)这里设置为0x00,0x0A,等待CPU返回的时间为10*250ms=2.5秒 _PLCCommand[3] = 0x00 # CPU监视定时器(H) _PLCCommand[4] = analysis.Content2 % 256 # 起始软元件(开始读取的地址) _PLCCommand[5] = analysis.Content2 // 256 _PLCCommand[6] = 0x00 _PLCCommand[7] = 0x00 _PLCCommand[8] = analysis.Content1.DataCode[1] # 软元件代码(L) _PLCCommand[9] = analysis.Content1.DataCode[0] # 软元件代码(H) _PLCCommand[10] = length % 256 # 软元件点数 _PLCCommand[11] = 0x00 # 判断是否进行位操作 if analysis.Content1.DataType == 1: if length > 0: _PLCCommand[10] = length % 256 # 软元件点数 else: _PLCCommand[10] = len(value) * 2 % 256 # 软元件点数 else: _PLCCommand[10] = len(value) // 2 % 256 # 软元件点数 _PLCCommand[12:] = value return OperateResult.CreateSuccessResult( _PLCCommand ) @staticmethod def ExtractActualData( response, isBit ): ''' 从PLC反馈的数据中提取出实际的数据内容,需要传入反馈数据,是否位读取''' if isBit == True: # 位读取 Content = bytearray((len(response) - 2) * 2) i = 2 while i < len(response): if (response[i] & 0x10) == 0x10: Content[(i - 2) * 2 + 0] = 0x01 if (response[i] & 0x01) == 0x01: Content[(i - 2) * 2 + 1] = 0x01 i = i + 1 return OperateResult.CreateSuccessResult( Content ) else: # 字读取 return OperateResult.CreateSuccessResult( response[2:] ) def Read( self, address, length ): '''从三菱PLC中读取想要的数据,返回读取结果''' # 获取指令 command = MelsecA1ENet.BuildReadCommand( address, length, self.PLCNumber ) if command.IsSuccess == False : return OperateResult.CreateFailedResult( command ) # 核心交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) # 错误代码验证 errorCode = read.Content[1] if errorCode != 0 : return OperateResult(err=errorCode, msg=StringResources.MelsecPleaseReferToManulDocument()) # 数据解析,需要传入是否使用位的参数 return MelsecA1ENet.ExtractActualData( read.Content, command.Content[0] == 0x00 ) def ReadBool( self, address, length = None ): '''从三菱PLC中批量读取位软元件,返回读取结果''' if length == None: read = self.ReadBool(address,1) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) else: return OperateResult.CreateSuccessResult(read.Content[0]) else: # 解析地址 analysis = MelsecHelper.McA1EAnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) # 位读取校验 if analysis.Content1.DataType == 0x00 : return OperateResult( msg = StringResources.MelsecReadBitInfo() ) # 核心交互 read = self.Read( address, length ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) # 转化bool数组 content = [] for i in range(length): if read.Content[i] == 0x01: content.append(True) else: content.append(False) return OperateResult.CreateSuccessResult( content ) def Write( self, address, value ): '''向PLC写入数据,数据格式为原始的字节类型''' # 解析指令 command = MelsecA1ENet.BuildWriteCommand( address, value, self.PLCNumber ) if command.IsSuccess == False : return command # 核心交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return read # 错误码校验 errorCode = read.Content[1] if errorCode != 0 : return OperateResult(err=errorCode, msg=StringResources.MelsecPleaseReferToManulDocument()) # 成功 return OperateResult.CreateSuccessResult( ) def WriteBool( self, address, values ): '''向PLC中位软元件写入bool数组或是值,返回值说明,比如你写入M100,values[0]对应M100''' if type(values) == list: buffer = bytearray(len(values)) for i in range(len(values)): if values[i] == True: buffer[i] = 0x01 return self.Write(address, buffer) else: return self.Write(address,[values]) class MelsecMcNet(NetworkDeviceBase): '''三菱PLC通讯类,采用Qna兼容3E帧协议实现,需要在PLC侧先的以太网模块先进行配置,必须为二进制通讯''' NetworkNumber = 0 NetworkStationNumber = 0 def __init__(self,ipAddress= "127.0.0.1",port = 0): '''实例化一个三菱的Qna兼容3E帧协议的通讯对象''' self.iNetMessage = MelsecQnA3EBinaryMessage() self.byteTransform = RegularByteTransform() self.ipAddress = ipAddress self.port = port self.WordLength = 1 @staticmethod def BuildReadCommand(address,length,networkNumber = 0,networkStationNumber = 0): '''根据类型地址长度确认需要读取的指令头''' analysis = MelsecHelper.McAnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) _PLCCommand = bytearray(21) _PLCCommand[0] = 0x50 # 副标题 _PLCCommand[1] = 0x00 _PLCCommand[2] = networkNumber # 网络号 _PLCCommand[3] = 0xFF # PLC编号 _PLCCommand[4] = 0xFF # 目标模块IO编号 _PLCCommand[5] = 0x03 _PLCCommand[6] = networkStationNumber # 目标模块站号 _PLCCommand[7] = 0x0C # 请求数据长度 _PLCCommand[8] = 0x00 _PLCCommand[9] = 0x0A # CPU监视定时器 _PLCCommand[10] = 0x00 _PLCCommand[11] = 0x01 # 批量读取数据命令 _PLCCommand[12] = 0x04 _PLCCommand[13] = analysis.Content1.DataType # 以点为单位还是字为单位成批读取 _PLCCommand[14] = 0x00 _PLCCommand[15] = analysis.Content2 % 256 # 起始地址的地位 _PLCCommand[16] = analysis.Content2 // 256 _PLCCommand[17] = 0x00 _PLCCommand[18] = analysis.Content1.DataCode # 指明读取的数据 _PLCCommand[19] = length % 256 # 软元件长度的地位 _PLCCommand[20] = length // 256 return OperateResult.CreateSuccessResult( _PLCCommand ) @staticmethod def BuildWriteCommand( address, value, networkNumber = 0, networkStationNumber = 0 ): '''根据类型地址以及需要写入的数据来生成指令头''' analysis = MelsecHelper.McAnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) length = -1 if analysis.Content1.DataType == 1: # 按照位写入的操作,数据需要重新计算 length2 = len(value) // 2 + 1 if len(value) % 2 == 0 : length2 = len(value) // 2 buffer = bytearray(length2) for i in range(length2): if value[i * 2 + 0] != 0x00 : buffer[i] += 0x10 if (i * 2 + 1) < len(value) : if value[i * 2 + 1] != 0x00 : buffer[i] += 0x01 length = len(value) value = buffer _PLCCommand = bytearray(21 + len(value)) _PLCCommand[0] = 0x50 # 副标题 _PLCCommand[1] = 0x00 _PLCCommand[2] = networkNumber # 网络号 _PLCCommand[3] = 0xFF # PLC编号 _PLCCommand[4] = 0xFF # 目标模块IO编号 _PLCCommand[5] = 0x03 _PLCCommand[6] = networkStationNumber # 目标模块站号 _PLCCommand[7] = (len(_PLCCommand) - 9) % 256 # 请求数据长度 _PLCCommand[8] = (len(_PLCCommand) - 9) // 256 _PLCCommand[9] = 0x0A # CPU监视定时器 _PLCCommand[10] = 0x00 _PLCCommand[11] = 0x01 # 批量读取数据命令 _PLCCommand[12] = 0x14 _PLCCommand[13] = analysis.Content1.DataType # 以点为单位还是字为单位成批读取 _PLCCommand[14] = 0x00 _PLCCommand[15] = analysis.Content2 % 256 # 起始地址的地位 _PLCCommand[16] = analysis.Content2 // 256 _PLCCommand[17] = 0x00 _PLCCommand[18] = analysis.Content1.DataCode # 指明写入的数据 # 判断是否进行位操作 if analysis.Content1.DataType == 1: if length > 0: _PLCCommand[19] = length % 256 # 软元件长度的地位 _PLCCommand[20] = length // 256 else: _PLCCommand[19] = len(value) * 2 % 256 # 软元件长度的地位 _PLCCommand[20] = len(value) * 2 // 256 else: _PLCCommand[19] = len(value) // 2 % 256 # 软元件长度的地位 _PLCCommand[20] = len(value) // 2 // 256 _PLCCommand[21:] = value return OperateResult.CreateSuccessResult( _PLCCommand ) @staticmethod def ExtractActualData( response, isBit ): ''' 从PLC反馈的数据中提取出实际的数据内容,需要传入反馈数据,是否位读取''' if isBit == True: # 位读取 Content = bytearray((len(response) - 11) * 2) i = 11 while i < len(response): if (response[i] & 0x10) == 0x10: Content[(i - 11) * 2 + 0] = 0x01 if (response[i] & 0x01) == 0x01: Content[(i - 11) * 2 + 1] = 0x01 i = i + 1 return OperateResult.CreateSuccessResult( Content ) else: # 字读取 Content = bytearray(len(response) - 11) Content[0:] = response[11:] return OperateResult.CreateSuccessResult( Content ) def Read( self, address, length ): '''从三菱PLC中读取想要的数据,返回读取结果''' # 获取指令 command = MelsecMcNet.BuildReadCommand( address, length, self.NetworkNumber, self.NetworkStationNumber ) if command.IsSuccess == False : return OperateResult.CreateFailedResult( command ) # 核心交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) # 错误代码验证 errorCode = read.Content[9] * 256 + read.Content[10] if errorCode != 0 : return OperateResult(err=errorCode, msg=StringResources.MelsecPleaseReferToManulDocument()) # 数据解析,需要传入是否使用位的参数 return MelsecMcNet.ExtractActualData( read.Content, command.Content[13] == 1 ) def ReadBool( self, address, length = None ): '''从三菱PLC中批量读取位软元件,返回读取结果''' if length == None: read = self.ReadBool(address,1) if read.IsSuccess == False: return OperateResult.CreateFailedResult(read) else: return OperateResult.CreateSuccessResult(read.Content[0]) else: # 解析地址 analysis = MelsecHelper.McAnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) # 位读取校验 if analysis.Content1.DataType == 0x00 : return OperateResult( msg = StringResources.MelsecReadBitInfo() ) # 核心交互 read = self.Read( address, length ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) # 转化bool数组 content = [] for i in range(length): if read.Content[i] == 0x01: content.append(True) else: content.append(False) return OperateResult.CreateSuccessResult( content ) def Write( self, address, value ): '''向PLC写入数据,数据格式为原始的字节类型''' # 解析指令 command = MelsecMcNet.BuildWriteCommand( address, value, self.NetworkNumber, self.NetworkStationNumber ) if command.IsSuccess == False : return command # 核心交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return read # 错误码校验 errorCode = read.Content[9] * 256 + read.Content[10] if errorCode != 0 : return OperateResult(err=errorCode, msg=StringResources.MelsecPleaseReferToManulDocument()) # 成功 return OperateResult.CreateSuccessResult( ) def WriteBool( self, address, values ): '''向PLC中位软元件写入bool数组或是值,返回值说明,比如你写入M100,values[0]对应M100''' if type(values) == list: buffer = bytearray(len(values)) for i in range(len(values)): if values[i] == True: buffer[i] = 0x01 return self.Write(address, buffer) else: return self.WriteBool(address,[values]) class MelsecMcAsciiNet(NetworkDeviceBase): '''三菱PLC通讯类,采用Qna兼容3E帧协议实现,需要在PLC侧先的以太网模块先进行配置,必须为ASCII通讯格式''' NetworkNumber = 0 NetworkStationNumber = 0 def __init__(self,ipAddress= "127.0.0.1",port = 0): '''实例化一个三菱的Qna兼容3E帧协议的通讯对象''' self.iNetMessage = MelsecQnA3EAsciiMessage() self.byteTransform = RegularByteTransform() self.ipAddress = ipAddress self.port = port self.WordLength = 1 @staticmethod def BuildReadCommand( address, length, networkNumber = 0, networkStationNumber = 0 ): '''根据类型地址长度确认需要读取的报文''' analysis = MelsecHelper.McAnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) # 默认信息----注意:高低字节交错 _PLCCommand = bytearray(42) _PLCCommand[ 0] = 0x35 # 副标题 _PLCCommand[ 1] = 0x30 _PLCCommand[ 2] = 0x30 _PLCCommand[ 3] = 0x30 _PLCCommand[ 4] = MelsecHelper.BuildBytesFromData( networkNumber )[0] # 网络号 _PLCCommand[ 5] = MelsecHelper.BuildBytesFromData( networkNumber )[1] _PLCCommand[ 6] = 0x46 # PLC编号 _PLCCommand[ 7] = 0x46 _PLCCommand[ 8] = 0x30 # 目标模块IO编号 _PLCCommand[ 9] = 0x33 _PLCCommand[10] = 0x46 _PLCCommand[11] = 0x46 _PLCCommand[12] = MelsecHelper.BuildBytesFromData( networkStationNumber )[0] # 目标模块站号 _PLCCommand[13] = MelsecHelper.BuildBytesFromData( networkStationNumber )[1] _PLCCommand[14] = 0x30 # 请求数据长度 _PLCCommand[15] = 0x30 _PLCCommand[16] = 0x31 _PLCCommand[17] = 0x38 _PLCCommand[18] = 0x30 # CPU监视定时器 _PLCCommand[19] = 0x30 _PLCCommand[20] = 0x31 _PLCCommand[21] = 0x30 _PLCCommand[22] = 0x30 # 批量读取数据命令 _PLCCommand[23] = 0x34 _PLCCommand[24] = 0x30 _PLCCommand[25] = 0x31 _PLCCommand[26] = 0x30 # 以点为单位还是字为单位成批读取 _PLCCommand[27] = 0x30 _PLCCommand[28] = 0x30 _PLCCommand[29] = 0x30 if analysis.Content1.DataType == 0 else 0x31 _PLCCommand[30] = analysis.Content1.AsciiCode.encode('ascii')[0] # 软元件类型 _PLCCommand[31] = analysis.Content1.AsciiCode.encode('ascii')[1] _PLCCommand[32:38] = MelsecHelper.BuildBytesFromAddress( analysis.Content2, analysis.Content1 ) # 起始地址的地位 _PLCCommand[38:42] = MelsecHelper.BuildBytesFromData( length, 4 ) # 软元件点数 return OperateResult.CreateSuccessResult( _PLCCommand ) @staticmethod def BuildWriteCommand( address, value, networkNumber = 0, networkStationNumber = 0 ): '''根据类型地址以及需要写入的数据来生成报文''' analysis = MelsecHelper.McAnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) # 预处理指令 if analysis.Content1.DataType == 0x01: # 位写入 buffer = bytearray(len(value)) for i in range(len(buffer)): buffer[i] = 0x30 if value[i] == 0x00 else 0x31 value = buffer else: # 字写入 buffer = bytearray(len(value) * 2) for i in range(len(value) // 2): tmp = value[i*2]+ value[i*2+1]*256 buffer[4*i:4*i+4] = MelsecHelper.BuildBytesFromData( tmp, 4 ) value = buffer # 默认信息----注意:高低字节交错 _PLCCommand = bytearray(42 + len(value)) _PLCCommand[ 0] = 0x35 # 副标题 _PLCCommand[ 1] = 0x30 _PLCCommand[ 2] = 0x30 _PLCCommand[ 3] = 0x30 _PLCCommand[ 4] = MelsecHelper.BuildBytesFromData( networkNumber )[0] # 网络号 _PLCCommand[ 5] = MelsecHelper.BuildBytesFromData( networkNumber )[1] _PLCCommand[ 6] = 0x46 # PLC编号 _PLCCommand[ 7] = 0x46 _PLCCommand[ 8] = 0x30 # 目标模块IO编号 _PLCCommand[ 9] = 0x33 _PLCCommand[10] = 0x46 _PLCCommand[11] = 0x46 _PLCCommand[12] = MelsecHelper.BuildBytesFromData( networkStationNumber )[0] # 目标模块站号 _PLCCommand[13] = MelsecHelper.BuildBytesFromData( networkStationNumber )[1] _PLCCommand[14] = MelsecHelper.BuildBytesFromData( len(_PLCCommand) - 18, 4 )[0] # 请求数据长度 _PLCCommand[15] = MelsecHelper.BuildBytesFromData( len(_PLCCommand) - 18, 4 )[1] _PLCCommand[16] = MelsecHelper.BuildBytesFromData( len(_PLCCommand) - 18, 4 )[2] _PLCCommand[17] = MelsecHelper.BuildBytesFromData( len(_PLCCommand) - 18, 4 )[3] _PLCCommand[18] = 0x30 # CPU监视定时器 _PLCCommand[19] = 0x30 _PLCCommand[20] = 0x31 _PLCCommand[21] = 0x30 _PLCCommand[22] = 0x31 # 批量写入的命令 _PLCCommand[23] = 0x34 _PLCCommand[24] = 0x30 _PLCCommand[25] = 0x31 _PLCCommand[26] = 0x30 # 子命令 _PLCCommand[27] = 0x30 _PLCCommand[28] = 0x30 _PLCCommand[29] = 0x30 if analysis.Content1.DataType == 0 else 0x31 _PLCCommand[30] = analysis.Content1.AsciiCode.encode('ascii')[0] # 软元件类型 _PLCCommand[31] = analysis.Content1.AsciiCode.encode('ascii')[1] _PLCCommand[32] = MelsecHelper.BuildBytesFromAddress( analysis.Content2, analysis.Content1 )[0] # 起始地址的地位 _PLCCommand[33] = MelsecHelper.BuildBytesFromAddress( analysis.Content2, analysis.Content1 )[1] _PLCCommand[34] = MelsecHelper.BuildBytesFromAddress( analysis.Content2, analysis.Content1 )[2] _PLCCommand[35] = MelsecHelper.BuildBytesFromAddress( analysis.Content2, analysis.Content1 )[3] _PLCCommand[36] = MelsecHelper.BuildBytesFromAddress( analysis.Content2, analysis.Content1 )[4] _PLCCommand[37] = MelsecHelper.BuildBytesFromAddress( analysis.Content2, analysis.Content1 )[5] # 判断是否进行位操作 if (analysis.Content1.DataType == 1): _PLCCommand[38] = MelsecHelper.BuildBytesFromData( len(value), 4 )[0] # 软元件点数 _PLCCommand[39] = MelsecHelper.BuildBytesFromData( len(value), 4 )[1] _PLCCommand[40] = MelsecHelper.BuildBytesFromData( len(value), 4 )[2] _PLCCommand[41] = MelsecHelper.BuildBytesFromData( len(value), 4 )[3] else: _PLCCommand[38] = MelsecHelper.BuildBytesFromData( len(value) // 4, 4 )[0] # 软元件点数 _PLCCommand[39] = MelsecHelper.BuildBytesFromData( len(value) // 4, 4 )[1] _PLCCommand[40] = MelsecHelper.BuildBytesFromData( len(value) // 4, 4 )[2] _PLCCommand[41] = MelsecHelper.BuildBytesFromData( len(value) // 4, 4 )[3] _PLCCommand[42:] = value return OperateResult.CreateSuccessResult( _PLCCommand ) @staticmethod def ExtractActualData( response, isBit ): if isBit == True: # 位读取 Content = bytearray(len(response) - 22) for i in range(22,len(response)): Content[i - 22] = 0x00 if response[i] == 0x30 else 0x01 return OperateResult.CreateSuccessResult( Content ) else: # 字读取 Content = bytearray((len(response) - 22) // 2) for i in range(len(Content)//2): tmp = int(response[i * 4 + 22:i * 4 + 26].decode('ascii'),16) Content[i * 2:i * 2+2] = struct.pack('= 0: temp = address.split(".") return int(temp[0]) * 8 + int(temp[1]) else: return int( address ) * 8 @staticmethod def AnalysisAddress( address = 'M0' ): '''解析数据地址,解析出地址类型,起始地址,DB块的地址''' result = OperateResult( ) try: result.Content3 = 0 if address[0] == 'I': result.Content1 = 0x81 result.Content2 = SiemensS7Net.CalculateAddressStarted( address[1:] ) elif address[0] == 'Q': result.Content1 = 0x82 result.Content2 = SiemensS7Net.CalculateAddressStarted( address[1:] ) elif address[0] == 'M': result.Content1 = 0x83 result.Content2 = SiemensS7Net.CalculateAddressStarted( address[1:] ) elif address[0] == 'D' or address[0:2] == "DB": result.Content1 = 0x84 adds = address.split(".") if address[1] == 'B': result.Content3 = int( adds[0][2:] ) else: result.Content3 = int( adds[0][1:] ) result.Content2 = SiemensS7Net.CalculateAddressStarted( address[ (address.find( '.' ) + 1):]) elif address[0] == 'T': result.Content1 = 0x1D result.Content2 = SiemensS7Net.CalculateAddressStarted( address[1:] ) elif address[0] == 'C': result.Content1 = 0x1C result.Content2 = SiemensS7Net.CalculateAddressStarted( address[1:] ) elif address[0] == 'V': result.Content1 = 0x84 result.Content3 = 1 result.Content2 = SiemensS7Net.CalculateAddressStarted( address[1:] ) else: result.Message = StringResources.NotSupportedDataType() result.Content1 = 0 result.Content2 = 0 result.Content3 = 0 return result except Exception as ex: result.Message = str(ex) return result result.IsSuccess = True return result @staticmethod def BuildReadCommand( address, length ): '''生成一个读取字数据指令头的通用方法''' if address == None : raise Exception( "address" ) if length == None : raise Exception( "count" ) if len(address) != len(length) : raise Exception( "两个参数的个数不统一" ) if len(length) > 19 : raise Exception( "读取的数组数量不允许大于19" ) readCount = len(length) _PLCCommand = bytearray(19 + readCount * 12) # ====================================================================================== _PLCCommand[0] = 0x03 # 报文头 _PLCCommand[1] = 0x00 _PLCCommand[2] = len(_PLCCommand) // 256 # 长度 _PLCCommand[3] = len(_PLCCommand) % 256 _PLCCommand[4] = 0x02 # 固定 _PLCCommand[5] = 0xF0 _PLCCommand[6] = 0x80 _PLCCommand[7] = 0x32 # 协议标识 _PLCCommand[8] = 0x01 # 命令:发 _PLCCommand[9] = 0x00 # redundancy identification (reserved): 0x0000; _PLCCommand[10] = 0x00 # protocol data unit reference; it’s increased by request event; _PLCCommand[11] = 0x00 _PLCCommand[12] = 0x01 # 参数命令数据总长度 _PLCCommand[13] = (len(_PLCCommand) - 17) // 256 _PLCCommand[14] = (len(_PLCCommand) - 17) % 256 _PLCCommand[15] = 0x00 # 读取内部数据时为00,读取CPU型号为Data数据长度 _PLCCommand[16] = 0x00 # ===================================================================================== _PLCCommand[17] = 0x04 # 读写指令,04读,05写 _PLCCommand[18] = readCount # 读取数据块个数 for ii in range(readCount): #=========================================================================================== # 指定有效值类型 _PLCCommand[19 + ii * 12] = 0x12 # 接下来本次地址访问长度 _PLCCommand[20 + ii * 12] = 0x0A # 语法标记,ANY _PLCCommand[21 + ii * 12] = 0x10 # 按字为单位 _PLCCommand[22 + ii * 12] = 0x02 # 访问数据的个数 _PLCCommand[23 + ii * 12] = length[ii] // 256 _PLCCommand[24 + ii * 12] = length[ii] % 256 # DB块编号,如果访问的是DB块的话 _PLCCommand[25 + ii * 12] = address[ii].Content3 // 256 _PLCCommand[26 + ii * 12] = address[ii].Content3 % 256 # 访问数据类型 _PLCCommand[27 + ii * 12] = address[ii].Content1 # 偏移位置 _PLCCommand[28 + ii * 12] = address[ii].Content2 // 256 // 256 % 256 _PLCCommand[29 + ii * 12] = address[ii].Content2 // 256 % 256 _PLCCommand[30 + ii * 12] = address[ii].Content2 % 256 return OperateResult.CreateSuccessResult( _PLCCommand ) @staticmethod def BuildBitReadCommand( address ): '''生成一个位读取数据指令头的通用方法''' analysis = SiemensS7Net.AnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) _PLCCommand = bytearray(31) # 报文头 _PLCCommand[0] = 0x03 _PLCCommand[1] = 0x00 # 长度 _PLCCommand[2] = len(_PLCCommand) // 256 _PLCCommand[3] = len(_PLCCommand) % 256 # 固定 _PLCCommand[4] = 0x02 _PLCCommand[5] = 0xF0 _PLCCommand[6] = 0x80 _PLCCommand[7] = 0x32 # 命令:发 _PLCCommand[8] = 0x01 # 标识序列号 _PLCCommand[9] = 0x00 _PLCCommand[10] = 0x00 _PLCCommand[11] = 0x00 _PLCCommand[12] = 0x01 # 命令数据总长度 _PLCCommand[13] = (len(_PLCCommand) - 17) // 256 _PLCCommand[14] = (len(_PLCCommand) - 17) % 256 _PLCCommand[15] = 0x00 _PLCCommand[16] = 0x00 # 命令起始符 _PLCCommand[17] = 0x04 # 读取数据块个数 _PLCCommand[18] = 0x01 #=========================================================================================== # 读取地址的前缀 _PLCCommand[19] = 0x12 _PLCCommand[20] = 0x0A _PLCCommand[21] = 0x10 # 读取的数据时位 _PLCCommand[22] = 0x01 # 访问数据的个数 _PLCCommand[23] = 0x00 _PLCCommand[24] = 0x01 # DB块编号,如果访问的是DB块的话 _PLCCommand[25] = analysis.Content3 // 256 _PLCCommand[26] = analysis.Content3 % 256 # 访问数据类型 _PLCCommand[27] = analysis.Content1 # 偏移位置 _PLCCommand[28] = analysis.Content2 // 256 // 256 % 256 _PLCCommand[29] = analysis.Content2 // 256 % 256 _PLCCommand[30] = analysis.Content2 % 256 return OperateResult.CreateSuccessResult( _PLCCommand ) @staticmethod def BuildWriteByteCommand( address, data ): '''生成一个写入字节数据的指令''' if data == None : data = bytearray(0) analysis = SiemensS7Net.AnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult(analysis) _PLCCommand = bytearray(35 + len(data)) _PLCCommand[0] = 0x03 _PLCCommand[1] = 0x00 # 长度 _PLCCommand[2] = (35 + len(data)) // 256 _PLCCommand[3] = (35 + len(data)) % 256 # 固定 _PLCCommand[4] = 0x02 _PLCCommand[5] = 0xF0 _PLCCommand[6] = 0x80 _PLCCommand[7] = 0x32 # 命令 发 _PLCCommand[8] = 0x01 # 标识序列号 _PLCCommand[9] = 0x00 _PLCCommand[10] = 0x00 _PLCCommand[11] = 0x00 _PLCCommand[12] = 0x01 # 固定 _PLCCommand[13] = 0x00 _PLCCommand[14] = 0x0E # 写入长度+4 _PLCCommand[15] = (4 + len(data)) // 256 _PLCCommand[16] = (4 + len(data)) % 256 # 读写指令 _PLCCommand[17] = 0x05 # 写入数据块个数 _PLCCommand[18] = 0x01 # 固定,返回数据长度 _PLCCommand[19] = 0x12 _PLCCommand[20] = 0x0A _PLCCommand[21] = 0x10 # 写入方式,1是按位,2是按字 _PLCCommand[22] = 0x02 # 写入数据的个数 _PLCCommand[23] = len(data) // 256 _PLCCommand[24] = len(data) % 256 # DB块编号,如果访问的是DB块的话 _PLCCommand[25] = analysis.Content3 // 256 _PLCCommand[26] = analysis.Content3 % 256 # 写入数据的类型 _PLCCommand[27] = analysis.Content1 # 偏移位置 _PLCCommand[28] = analysis.Content2 // 256 // 256 % 256 _PLCCommand[29] = analysis.Content2 // 256 % 256 _PLCCommand[30] = analysis.Content2 % 256 # 按字写入 _PLCCommand[31] = 0x00 _PLCCommand[32] = 0x04 # 按位计算的长度 _PLCCommand[33] = len(data) * 8 // 256 _PLCCommand[34] = len(data) * 8 % 256 _PLCCommand[35:] = data return OperateResult.CreateSuccessResult(_PLCCommand) @staticmethod def BuildWriteBitCommand( address, data ): analysis = SiemensS7Net.AnalysisAddress( address ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult(analysis) buffer = bytearray(1) if data == True : buffer[0] = 0x01 _PLCCommand = bytearray(35 + len(buffer)) _PLCCommand[0] = 0x03 _PLCCommand[1] = 0x00 # 长度 _PLCCommand[2] = (35 + len(buffer)) // 256 _PLCCommand[3] = (35 + len(buffer)) % 256 # 固定 _PLCCommand[4] = 0x02 _PLCCommand[5] = 0xF0 _PLCCommand[6] = 0x80 _PLCCommand[7] = 0x32 # 命令 发 _PLCCommand[8] = 0x01 # 标识序列号 _PLCCommand[9] = 0x00 _PLCCommand[10] = 0x00 _PLCCommand[11] = 0x00 _PLCCommand[12] = 0x01 # 固定 _PLCCommand[13] = 0x00 _PLCCommand[14] = 0x0E # 写入长度+4 _PLCCommand[15] = (4 + len(buffer)) // 256 _PLCCommand[16] = (4 + len(buffer)) % 256 # 命令起始符 _PLCCommand[17] = 0x05 # 写入数据块个数 _PLCCommand[18] = 0x01 _PLCCommand[19] = 0x12 _PLCCommand[20] = 0x0A _PLCCommand[21] = 0x10 # 写入方式,1是按位,2是按字 _PLCCommand[22] = 0x01 # 写入数据的个数 _PLCCommand[23] = len(buffer) // 256 _PLCCommand[24] = len(buffer) % 256 # DB块编号,如果访问的是DB块的话 _PLCCommand[25] = analysis.Content3 // 256 _PLCCommand[26] = analysis.Content3 % 256 # 写入数据的类型 _PLCCommand[27] = analysis.Content1 # 偏移位置 _PLCCommand[28] = analysis.Content2 // 256 // 256 _PLCCommand[29] = analysis.Content2 // 256 _PLCCommand[30] = analysis.Content2 % 256 # 按位写入 _PLCCommand[31] = 0x00 _PLCCommand[32] = 0x03 # 按位计算的长度 _PLCCommand[33] = len(buffer) // 256 _PLCCommand[34] = len(buffer) % 256 _PLCCommand[35:] = buffer return OperateResult.CreateSuccessResult(_PLCCommand) def InitializationOnConnect( self, socket ): '''连接上服务器后需要进行的二次握手操作''' # msg = SoftBasic.ByteToHexString(self.plcHead1, ' ') # 第一次握手 read_first = self.ReadFromCoreServerBase( socket, self.plcHead1 ) if read_first.IsSuccess == False : return read_first # 第二次握手 read_second = self.ReadFromCoreServerBase( socket, self.plcHead2 ) if read_second.IsSuccess == False : return read_second # 返回成功的信号 return OperateResult.CreateSuccessResult( ) def ReadOrderNumber( self ): '''从PLC读取订货号信息''' read = self.ReadFromCoreServer( self.plcOrderNumber ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) return OperateResult.CreateSuccessResult( read.Content[71:92].decode('ascii') ) def __ReadBase( self, address, length ): '''基础的读取方法,外界不应该调用本方法''' command = SiemensS7Net.BuildReadCommand( address, length ) if command.IsSuccess == False : return command read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return read # 分析结果 receiveCount = 0 for i in range(len(length)): receiveCount += length[i] if len(read.Content) >= 21 and read.Content[20] == len(length) : buffer = bytearray(receiveCount) kk = 0 ll = 0 ii = 21 while ii < len(read.Content): if ii + 1 < len(read.Content): if read.Content[ii] == 0xFF and read.Content[ii + 1] == 0x04: # 有数据 buffer[ll : ll + length[kk]] = read.Content[ii+4 : ii+4+length[kk]] ii += length[kk] + 3 ll += length[kk] kk += 1 ii += 1 return OperateResult.CreateSuccessResult( buffer ) else : result = OperateResult() result.ErrorCode = read.ErrorCode result.Message = "数据块长度校验失败" return result def Read( self, address, length ): '''从PLC读取数据,地址格式为I100,Q100,DB20.100,M100,T100,C100以字节为单位''' if type(address) == list and type(length) == list: addressResult = [] for i in range(length): tmp = SiemensS7Net.AnalysisAddress( address[i] ) if tmp.IsSuccess == False : return OperateResult.CreateFailedResult( addressResult[i] ) addressResult.append( tmp ) return self.__ReadBase( addressResult, length ) else: addressResult = SiemensS7Net.AnalysisAddress( address ) if addressResult.IsSuccess == False : return OperateResult.CreateFailedResult( addressResult ) bytesContent = bytearray() alreadyFinished = 0 while alreadyFinished < length : readLength = min( length - alreadyFinished, 200 ) read = self.__ReadBase( [ addressResult ], [ readLength ] ) if read.IsSuccess == True : bytesContent.extend( read.Content ) else: return read alreadyFinished += readLength addressResult.Content2 += readLength * 8 return OperateResult.CreateSuccessResult( bytesContent ) def __ReadBitFromPLC( self, address ): '''从PLC读取数据,地址格式为I100,Q100,DB20.100,M100,以位为单位''' # 指令生成 command = SiemensS7Net.BuildBitReadCommand( address ) if command.IsSuccess == False : return OperateResult.CreateFailedResult( command ) # 核心交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return read # 分析结果 receiveCount = 1 if len(read.Content) >= 21 and read.Content[20] == 1 : buffer = bytearray(receiveCount) if 22 < len(read.Content) : if read.Content[21] == 0xFF and read.Content[22] == 0x03: # 有数据 buffer[0] = read.Content[25] return OperateResult.CreateSuccessResult( buffer ) else: result = OperateResult() result.ErrorCode = read.ErrorCode result.Message = "数据块长度校验失败" return result def ReadBool( self, address ): '''读取指定地址的bool数据''' return self.GetBoolResultFromBytes( self.__ReadBitFromPLC( address ) ) def ReadByte( self, address ): '''读取指定地址的byte数据''' return self.GetByteResultFromBytes( self.Read( address, 1 ) ) def __WriteBase( self, entireValue ): '''基础的写入数据的操作支持''' write = self.ReadFromCoreServer( entireValue ) if write.IsSuccess == False : return write if write.Content[len(write.Content) - 1] != 0xFF : # 写入异常 return OperateResult( msg = "写入数据异常", err = write.Content[write.Content.Length - 1]) else: return OperateResult.CreateSuccessResult( ) def Write( self, address, value ): '''将数据写入到PLC数据,地址格式为I100,Q100,DB20.100,M100,以字节为单位''' command = self.BuildWriteByteCommand( address, value ) if command.IsSuccess == False : return command return self.__WriteBase( command.Content ) def WriteBool( self, address, value ): '''写入PLC的一个位,例如"M100.6","I100.7","Q100.0","DB20.100.0",如果只写了"M100"默认为"M100.0''' # 生成指令 command = SiemensS7Net.BuildWriteBitCommand( address, value ) if command.IsSuccess == False : return command return self.__WriteBase( command.Content ) def WriteByte( self, address, value ): '''向PLC中写入byte数据,返回值说明''' return self.Write( address, [value] ) class SiemensFetchWriteNet(NetworkDeviceBase): '''使用了Fetch/Write协议来和西门子进行通讯,该种方法需要在PLC侧进行一些配置''' def __init__( self, ipAddress = '127.0.0.1', port = 1000 ): ''' 实例化一个西门子的Fetch/Write协议的通讯对象,可以指定ip地址及端口号''' self.ipAddress = ipAddress self.port = port self.WordLength = 2 @staticmethod def CalculateAddressStarted( address = "M100" ): '''计算特殊的地址信息''' if address.find( '.' ) < 0: return int( address ) else: temp = address.split( '.' ) return int( temp[0] ) @staticmethod def AnalysisAddress( address = "M100" ): '''解析数据地址,解析出地址类型,起始地址,DB块的地址''' result = OperateResult( ) try: result.Content3 = 0 if address[0] == 'I': result.Content1 = 0x03 result.Content2 = SiemensFetchWriteNet.CalculateAddressStarted( address[1:] ) elif address[0] == 'Q': result.Content1 = 0x04 result.Content2 = SiemensFetchWriteNet.CalculateAddressStarted( address[1:] ) elif address[0] == 'M': result.Content1 = 0x02 result.Content2 = SiemensFetchWriteNet.CalculateAddressStarted( address[1:] ) elif address[0] == 'D' or address.startswith("DB"): result.Content1 = 0x01 adds = address.split( '.' ) if address[1] == 'B': result.Content3 = int( adds[0][2:] ) else: result.Content3 = int( adds[0][1:] ) if result.Content3 > 255: result.Message = "DB块数据无法大于255" return result result.Content2 = SiemensFetchWriteNet.CalculateAddressStarted( address[ address.find( '.' ) + 1:] ) elif address[0] == 'T': result.Content1 = 0x07 result.Content2 = SiemensFetchWriteNet.CalculateAddressStarted( address[1:] ) elif address[0] == 'C': result.Content1 = 0x06 result.Content2 = SiemensFetchWriteNet.CalculateAddressStarted( address[1:]) else: result.Message = StringResources.NotSupportedDataType() result.Content1 = 0 result.Content2 = 0 result.Content3 = 0 return result except Exception as ex: result.Message = str(ex) return result result.IsSuccess = True return result @staticmethod def BuildReadCommand( address, count ): '''生成一个读取字数据指令头的通用方法''' result = OperateResult( ) analysis = SiemensFetchWriteNet.AnalysisAddress( address ) if analysis.IsSuccess == False : result.CopyErrorFromOther( analysis ) return result _PLCCommand = bytearray(16) _PLCCommand[0] = 0x53 _PLCCommand[1] = 0x35 _PLCCommand[2] = 0x10 _PLCCommand[3] = 0x01 _PLCCommand[4] = 0x03 _PLCCommand[5] = 0x05 _PLCCommand[6] = 0x03 _PLCCommand[7] = 0x08 # 指定数据区 _PLCCommand[8] = analysis.Content1 _PLCCommand[9] = analysis.Content3 # 指定数据地址 _PLCCommand[10] =analysis.Content2 // 256 _PLCCommand[11] = analysis.Content2 % 256 if analysis.Content1 == 0x01 or analysis.Content1 == 0x06 or analysis.Content1 == 0x07: if count % 2 != 0: result.Message = "读取的数据长度必须为偶数" return result else: # 指定数据长度 _PLCCommand[12] = count // 2 // 256 _PLCCommand[13] = count // 2 % 256 else: # 指定数据长度 _PLCCommand[12] = count // 256 _PLCCommand[13] = count % 256 _PLCCommand[14] = 0xff _PLCCommand[15] = 0x02 result.Content = _PLCCommand result.IsSuccess = True return result @staticmethod def BuildWriteCommand( address, data ): '''生成一个写入字节数据的指令''' if data == None : data = bytearray(0) result = OperateResult( ) analysis = SiemensFetchWriteNet.AnalysisAddress( address ) if analysis.IsSuccess == False: result.CopyErrorFromOther( analysis ) return result _PLCCommand = bytearray(16 + len(data)) _PLCCommand[0] = 0x53 _PLCCommand[1] = 0x35 _PLCCommand[2] = 0x10 _PLCCommand[3] = 0x01 _PLCCommand[4] = 0x03 _PLCCommand[5] = 0x03 _PLCCommand[6] = 0x03 _PLCCommand[7] = 0x08 # 指定数据区 _PLCCommand[8] = analysis.Content1 _PLCCommand[9] = analysis.Content3 # 指定数据地址 _PLCCommand[10] = analysis.Content2 // 256 _PLCCommand[11] = analysis.Content2 % 256 if analysis.Content1 == 0x01 or analysis.Content1 == 0x06 or analysis.Content1 == 0x07: if data.Length % 2 != 0: result.Message = "写入的数据长度必须为偶数" return result else: # 指定数据长度 _PLCCommand[12] = data.Length // 2 // 256 _PLCCommand[13] = data.Length // 2 % 256 else: # 指定数据长度 _PLCCommand[12] = data.Length // 256 _PLCCommand[13] = data.Length % 256 _PLCCommand[14] = 0xff _PLCCommand[15] = 0x02 # 放置数据 _PLCCommand[16:16+len(data)] = data result.Content = _PLCCommand result.IsSuccess = True return result def Read( self, address, length ): '''从PLC读取数据,地址格式为I100,Q100,DB20.100,M100,T100,C100,以字节为单位''' # 指令解析 -> Instruction parsing command = SiemensFetchWriteNet.BuildReadCommand( address, length ) if command.IsSuccess == False : return command # 核心交互 -> Core Interactions read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return read # 错误码验证 -> Error code Verification if read.Content[8] != 0x00 : return OperateResult(read.Content[8],"发生了异常,具体信息查找Fetch/Write协议文档") # 读取正确 -> Read Right buffer = bytearray(len(read.Content) - 16) buffer[0:len(buffer)] = read.Content[16:16+len(buffer)] return OperateResult.CreateSuccessResult( buffer ) def ReadByte( self, address ): '''读取指定地址的byte数据''' return self.GetByteResultFromBytes( self.Read( address, 1 ) ) def Write( self, address, value ): '''将数据写入到PLC数据,地址格式为I100,Q100,DB20.100,M100,以字节为单位''' # 指令解析 -> Instruction parsing command = SiemensFetchWriteNet.BuildWriteCommand( address, value ) if command.IsSuccess == False : return command # 核心交互 -> Core Interactions write = self.ReadFromCoreServer( command.Content ) if write.IsSuccess == False : return write # 错误码验证 -> Error code Verification if (write.Content[8] != 0x00) : OperateResult(err = write.Content[8], msg = "西门子PLC写入失败!") # 写入成功 -> Write Right return OperateResult.CreateSuccessResult( ) def WriteBool( self, address, values): '''向PLC中写入byte数据,返回是否写入成功 -> Writes byte data to the PLC and returns whether the write succeeded''' if type(values) == list: return self.Write( address, SoftBasic.BoolArrayToByte( values ) ) else: return self.WriteBool( address, [ values ] ) # Omron PLC 通讯类 class OmronFinsDataType: '''欧姆龙的Fins协议的数据类型''' BitCode = 0 WordCode = 0 def __init__(self, bitCode = 0, wordCode = 0): '''实例化一个Fins的数据类型''' self.BitCode = bitCode self.WordCode = wordCode @staticmethod def DM(): '''DM Area''' return OmronFinsDataType( 0x02, 0x82 ) @staticmethod def CIO(): '''CIO Area''' return OmronFinsDataType( 0x30, 0xB0 ) @staticmethod def WR(): '''Work Area''' return OmronFinsDataType( 0x31, 0xB1 ) @staticmethod def HR(): '''Holding Bit Area''' return OmronFinsDataType( 0x32, 0xB2 ) @staticmethod def AR(): '''Auxiliary Bit Area''' return OmronFinsDataType( 0x33, 0xB3 ) class OmronFinsNet(NetworkDoubleBase): '''欧姆龙PLC通讯类,采用Fins-Tcp通信协议实现''' def __init__(self,ipAddress="127.0.0.1",port = 1000): '''实例化一个欧姆龙PLC Fins帧协议的通讯对象''' self.ipAddress = ipAddress self.port = port ICF = 0 RSV = 0 GCT = 0 DNA = 0 DA1 = 0 DA2 = 0 SNA = 0 SA1 = 0 SA2 = 0 SID = 0 def SetSA1(self, value): '''设置SA1的方法''' self.SA1 = value self.handSingle[19] = value handSingle = bytearray([0x46, 0x49, 0x4E, 0x53,0x00, 0x00, 0x00, 0x0C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01]) @staticmethod def AnalysisAddress( address, isBit ): result = OperateResult( ) try: if address[0] == 'D' or address[0] == 'd': # DM区数据 result.Content1 = OmronFinsDataType.DM elif address[0] == 'C' or address[0] == 'c': # CIO区数据 result.Content1 = OmronFinsDataType.CIO elif address[0] == 'W' or address[0] == 'w': # WR区 result.Content1 = OmronFinsDataType.WR elif address[0] == 'H' or address[0] == 'h': # HR区 result.Content1 = OmronFinsDataType.HR elif address[0] == 'A' or address[0] == 'a': # AR区 result.Content1 = OmronFinsDataType.AR else: raise RuntimeError( StringResources.NotSupportedDataType() ) if isBit == True: # 位操作 splits = address[1:].split('.') addr = int( splits[0] ) result.Content2 = bytearray(3) result.Content2[0] = struct.pack(' 1: result.Content2[2] = int(splits[1]) if result.Content2[2] > 15: raise RuntimeError( "欧姆龙位地址必须0-15之间" ) else: # 字操作 addr = int( address[1:] ) result.Content2 = bytearray(3) result.Content2[0] = struct.pack('= 16: # 提取错误码 buffer = bytearray(4) buffer[0] = response[15] buffer[1] = response[14] buffer[2] = response[13] buffer[3] = response[12] err = struct.unpack( ' 0 : return OperateResult( err = err, msg = OmronFinsNet.GetStatusDescription( err ) ) if response.Length >= 30: err = response[28] * 256 + response[29] if err > 0 : return OperateResult( err = err, msg = "欧姆龙数据接收出错" ) if isRead == False : return OperateResult.CreateSuccessResult( bytearray(0) ) # 读取操作 content = bytearray(len(response) - 30) if len(content) > 0 : content[0:len(content)] = response[30:] return OperateResult.CreateSuccessResult( content ) return OperateResult( msg = "欧姆龙数据接收出错" ) @staticmethod def GetStatusDescription( err ): '''获取错误信息的字符串描述文本''' if err == 0: return StringResources.OmronStatus0() elif err == 1: return StringResources.OmronStatus1() elif err == 2: return StringResources.OmronStatus2() elif err == 3: return StringResources.OmronStatus3() elif err == 20: return StringResources.OmronStatus20() elif err == 21: return StringResources.OmronStatus21() elif err == 22: return StringResources.OmronStatus22() elif err == 23: return StringResources.OmronStatus23() elif err == 24: return StringResources.OmronStatus24() elif err == 25: return StringResources.OmronStatus25() else: return StringResources.UnknownError() def PackCommand( self, cmd ): '''将普通的指令打包成完整的指令''' buffer = bytearray(26 + len(cmd)) buffer[0:4] = self.handSingle[0:4] tmp = struct.pack('>i', len(buffer) - 8 ) buffer[4:8] = tmp buffer[11] = 0x02 buffer[16] = self.ICF buffer[17] = self.RSV buffer[18] = self.GCT buffer[19] = self.DNA buffer[20] = self.DA1 buffer[21] = self.DA2 buffer[22] = self.SNA buffer[23] = self.SA1 buffer[24] = self.SA2 buffer[25] = self.SID buffer[26:] = cmd return buffer def BuildReadCommand( self, address, length , isBit): '''根据类型地址长度确认需要读取的指令头''' analysis = OmronFinsNet.AnalysisAddress( address, isBit ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) _PLCCommand = bytearray(8) _PLCCommand[0] = 0x01 _PLCCommand[1] = 0x01 if isBit == True: _PLCCommand[2] = analysis.Content1.BitCode else: _PLCCommand[2] = analysis.Content1.WordCode _PLCCommand[3:6] = analysis.Content2 _PLCCommand[6] = length / 256 _PLCCommand[7] = length % 256 return OperateResult.CreateSuccessResult( self.PackCommand( _PLCCommand ) ) def BuildWriteCommand( self, address, value, isBit ): '''根据类型地址以及需要写入的数据来生成指令头''' analysis = self.AnalysisAddress( address, isBit ) if analysis.IsSuccess == False : return OperateResult.CreateFailedResult( analysis ) _PLCCommand = bytearray(8 + len(value)) _PLCCommand[0] = 0x01 _PLCCommand[1] = 0x02 if isBit == True: _PLCCommand[2] = analysis.Content1.BitCode else: _PLCCommand[2] = analysis.Content1.WordCode _PLCCommand[3:6] = analysis.Content2 if isBit == True: _PLCCommand[6] = len(value) // 256 _PLCCommand[7] = len(value) % 256 else: _PLCCommand[6] = len(value) // 2 // 256 _PLCCommand[7] = len(value) // 2 % 256 _PLCCommand[8:] = value return OperateResult.CreateSuccessResult( self.PackCommand( _PLCCommand ) ) def InitializationOnConnect( self, socket ): '''在连接上欧姆龙PLC后,需要进行一步握手协议''' # 握手信号 read = self.ReadFromCoreServerBase( socket, self.handSingle ) if read.IsSuccess == False : return read # 检查返回的状态 buffer = bytearray(4) buffer[0] = read.Content2[7] buffer[1] = read.Content2[6] buffer[2] = read.Content2[5] buffer[3] = read.Content2[4] status = struct.unpack( '= 16 : self.DA1 = read.Content2[15] return OperateResult.CreateSuccessResult( ) def Read( self, address, length ): '''从欧姆龙PLC中读取想要的数据,返回读取结果,读取单位为字''' # 获取指令 command = self.BuildReadCommand( address, length, False ) if command.IsSuccess == False : return OperateResult.CreateFailedResult( command ) # 核心数据交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) # 数据有效性分析 valid = OmronFinsNet.ResponseValidAnalysis( read.Content, True ) if valid.IsSuccess == False : return OperateResult.CreateFailedResult( valid ) # 读取到了正确的数据 return OperateResult.CreateSuccessResult( valid.Content ) def ReadBool( self, address, length = None ): '''从欧姆龙PLC中批量读取位软元件,返回读取结果''' if length == None: read = self.ReadBool( address, 1 ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) return OperateResult.CreateSuccessResult( read.Content[0] ) else: # 获取指令 command = self.BuildReadCommand( address, length, True ) if command.IsSuccess == False : return OperateResult.CreateFailedResult( command ) # 核心数据交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return OperateResult.CreateFailedResult( read ) # 数据有效性分析 valid = OmronFinsNet.ResponseValidAnalysis( read.Content, True ) if valid.IsSuccess == False : return OperateResult.CreateFailedResult( valid ) # 返回正确的数据信息 content = [] for i in range(len(read.Content)): if read.Content[i] == 0x01: content.append(True) else: content.append(False) return OperateResult.CreateSuccessResult( content ) def Write( self, address, value ): '''向PLC中位软元件写入bool数组,返回值说明,比如你写入D100,values[0]对应D100.0''' # 获取指令 command = self.BuildWriteCommand( address, value, False ) if command.IsSuccess == False : return command # 核心数据交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return read # 数据有效性分析 valid = OmronFinsNet.ResponseValidAnalysis( read.Content, False ) if valid.IsSuccess == False : return valid # 成功 return OperateResult.CreateSuccessResult( ) def WriteBool( self, address, values ): '''向PLC中位软元件写入bool数组,返回值说明,比如你写入D100,values[0]对应D100.0''' if type(values) == list: # 获取指令 content = bytearray(len(values)) for i in range(len(values)): if values[i] == True: content[i] = 0x01 else: content[i] = 0x00 command = self.BuildWriteCommand( address, content, True ) if command.IsSuccess == False : return command # 核心数据交互 read = self.ReadFromCoreServer( command.Content ) if read.IsSuccess == False : return read # 数据有效性分析 valid = OmronFinsNet.ResponseValidAnalysis( read.Content, False ) if valid.IsSuccess == False : return valid # 写入成功 return OperateResult.CreateSuccessResult( ) else: return self.WriteBool( address, [values] ) # NetSimplifyClient类 class NetSimplifyClient(NetworkDoubleBase): '''异步访问数据的客户端类,用于向服务器请求一些确定的数据信息''' def __init__(self, ipAddress, port): '''实例化一个客户端的对象,用于和服务器通信''' self.iNetMessage = HslMessage() self.byteTransform = RegularByteTransform() self.ipAddress = ipAddress self.port = port def ReadBytesFromServer( self, customer, send = None): '''客户端向服务器进行请求,请求字节数据''' return self.__ReadFromServerBase( HslProtocol.CommandBytes( customer, self.Token, send)) def ReadStringFromServer( self, customer, send = None): '''客户端向服务器进行请求,请求字符串数据''' read = self.__ReadFromServerBase( HslProtocol.CommandString( customer, self.Token, send)) if read.IsSuccess == False: return OperateResult.CreateFailedResult( read ) return OperateResult.CreateSuccessResult( read.Content.decode('utf-16') ) def __ReadFromServerBase( self, send): '''需要发送的底层数据''' read = self.ReadFromCoreServer( send ) if read.IsSuccess == False: return read headBytes = bytearray(HslProtocol.HeadByteLength()) contentBytes = bytearray(len(read.Content) - HslProtocol.HeadByteLength()) headBytes[0:HslProtocol.HeadByteLength()] = read.Content[0:HslProtocol.HeadByteLength()] if len(contentBytes) > 0: contentBytes[0:len(contentBytes)] = read.Content[HslProtocol.HeadByteLength():len(read.Content)] contentBytes = HslProtocol.CommandAnalysis( headBytes, contentBytes ) return OperateResult.CreateSuccessResult( contentBytes ) class AppSession: '''网络会话信息''' IpAddress = "127.0.0.1" Port = 12345 LoginAlias = "" HeartTime = None ClientType = "" ClientUniqueID = "" BytesHead = bytearray(32) BytesContent = bytearray(0) KeyGroup = "" WorkSocket = socket.socket() HybirdLockSend = threading.Lock() def __init__( self ): self.ClientUniqueID = SoftBasic.GetUniqueStringByGuidAndRandom() self.HeartTime = datetime.datetime.now() def Clear( self ): self.BytesHead = bytearray(HslProtocol.HeadByteLength()) self.BytesContent = None class NetworkXBase(NetworkBase): '''多功能网络类的基类''' ThreadBack = None def __init__(self): return def SendBytesAsync( self, session, content ): '''发送数据的方法''' if content == None : return session.HybirdLockSend.acquire() self.Send( session.WorkSocket, content ) session.HybirdLockSend.release() def ThreadBackground( self, session ): while True: if session.WorkSocket == None : break readHeadBytes = self.Receive(session.WorkSocket,HslProtocol.HeadByteLength()) if readHeadBytes.IsSuccess == False : self.SocketReceiveException( session ) return length = struct.unpack( ' 0) ThreadPool.QueueUserWorkItem( new WaitCallback( ThreadPoolCheckTimeOut ), hslTimeOut ); # 接收头指令 headResult = self.Receive(socket, HslProtocol.HeadByteLength()) if headResult.IsSuccess == False: return OperateResult.CreateFailedResult(headResult) # 检查令牌 if self.CheckRemoteToken(headResult.Content) == False: self.CloseSocket(socket) return OperateResult( msg = StringResources.TokenCheckFailed() ) contentLength = struct.unpack( '