From 718fc750d906fd0a777a2361f78777423f1c157a Mon Sep 17 00:00:00 2001 From: MrZ626 <1046101471@qq.com> Date: Tue, 20 Jul 2021 01:02:18 +0800 Subject: [PATCH] =?UTF-8?q?websocket=E7=9A=84=E5=AD=90=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=88=86=E7=A6=BB=E8=87=B3=E5=8D=95=E7=8B=AC?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=B9=B6=E7=A7=BB=E9=99=A4debugMode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Zframework/websocket.lua | 218 +------------------------------- Zframework/websocket_thread.lua | 202 +++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+), 216 deletions(-) create mode 100644 Zframework/websocket_thread.lua diff --git a/Zframework/websocket.lua b/Zframework/websocket.lua index 9083738d..7e358638 100644 --- a/Zframework/websocket.lua +++ b/Zframework/websocket.lua @@ -6,224 +6,10 @@ local host= local port='10026' local path='/tech/socket/v1' -local debugMode=''--S:send, R:receive, M=mark - -local wsThread=[[ -- lua + LÖVE threading websocket client -- Original pure lua ver. by flaribbit and Particle_G -- Threading version by MrZ -local triggerCHN,sendCHN,readCHN,threadName=... - -local CHN_demand,CHN_getCount=triggerCHN.demand,triggerCHN.getCount -local CHN_push,CHN_pop=triggerCHN.push,triggerCHN.pop - -local SOCK=require'socket'.tcp() -local JSON=require'Zframework.json' - -do--Connect - local host=CHN_demand(sendCHN) - local port=CHN_demand(sendCHN) - local path=CHN_demand(sendCHN) - local body=CHN_demand(sendCHN) - local timeout=CHN_demand(sendCHN) - - SOCK:settimeout(timeout) - local res,err=SOCK:connect(host,port) - if err then CHN_push(readCHN,err)return end - - --WebSocket handshake - if not body then body=''end - SOCK:send( - 'GET '..path..' HTTP/1.1\r\n'.. - 'Host: '..host..':'..port..'\r\n'.. - 'Connection: Upgrade\r\n'.. - 'Upgrade: websocket\r\n'.. - 'Content-Type: application/json\r\n'.. - 'Content-Length: '..#body..'\r\n'.. - 'Sec-WebSocket-Version: 13\r\n'.. - 'Sec-WebSocket-Key: osT3F7mvlojIvf3/8uIsJQ==\r\n\r\n'..--secKey - body - ) - - --First line of HTTP - res,err=SOCK:receive('*l') - if not res then CHN_push(readCHN,err)return end - local code,ctLen - code=res:find(' ') - code=res:sub(code+1,code+3) - - --Get body length from headers and remove headers - repeat - res,err=SOCK:receive('*l') - if not res then CHN_push(readCHN,err)return end - if not ctLen and res:find('length')then - ctLen=tonumber(res:match('%d+')) - end - until res=='' - - --Result - if ctLen then - if code=='101'then - CHN_push(readCHN,'success') - else - res,err=SOCK:receive(ctLen) - if not res then - CHN_push(readCHN,err) - else - res=JSON.decode(res) - CHN_push(readCHN,(code or"XXX")..":"..(res and res.reason or"Server Error")) - end - return - end - end - SOCK:settimeout(0) -end - - - -local byte=string.byte -local band,shl=bit.band,bit.lshift - -local _send do - local char=string.char - local bor,bxor=bit.bor,bit.bxor - local shr=bit.rshift - - local mask_key={1,14,5,14} - local mask_str=char(unpack(mask_key)) - - function _send(op,message) - ]]..(debugMode:find'S'and''or'--')..[[print((">> %s[%d]:%s"):format(threadName,#message,message)) - --Message type - SOCK:send(char(bor(0x80,op))) - - if message then - --Length - local length=#message - if length>65535 then - SOCK:send(char(bor(127,0x80),0,0,0,0,band(shr(length,24),0xff),band(shr(length,16),0xff),band(shr(length,8),0xff),band(length,0xff))) - elseif length>125 then - SOCK:send(char(bor(126,0x80),band(shr(length,8),0xff),band(length,0xff))) - else - SOCK:send(char(bor(length,0x80))) - end - SOCK:send(mask_str) - local msgbyte={byte(message,1,length)} - for i=1,length do - msgbyte[i]=bxor(msgbyte[i],mask_key[(i-1)%4+1]) - end - return SOCK:send(char(unpack(msgbyte))) - else - SOCK:send('\128'..mask_str) - return 0 - end - end -end -local res,err -local op,fin -local length -local lBuffer=""--Long multi-data buffer -local UFF--Un-finished-frame mode -local sBuffer=""--Short multi-frame buffer -while true do--Running - CHN_demand(triggerCHN) - - --Send - while CHN_getCount(sendCHN)>=2 do - local op=CHN_pop(sendCHN) - local message=CHN_pop(sendCHN) - _send(op,message) - end - - --Read - while true do - if not UFF then--UNF process - --Byte 0-1 - res,err=SOCK:receive(2) - if err then break end - - op=band(byte(res,1),0x0f) - fin=band(byte(res,1),0x80)==0x80 - - --Calculating data length - length=band(byte(res,2),0x7f) - if length==126 then - res=SOCK:receive(2) - length=shl(byte(res,1),8)+byte(res,2) - elseif length==127 then - local b={byte(SOCK:receive(8),1,8)} - length=shl(b[5],24)+shl(b[6],16)+shl(b[7],8)+b[8] - end - - if length>0 then - --Receive data - local s,_,p=SOCK:receive(length) - if s then - res=s - elseif p then--UNF head - ]]..(debugMode:find'R'and''or'--')..[[print(("<< %s[%d/%d]:%s"):format(threadName,#p,length,#p<50 and p or p:sub(1,50))) - UFF=true - sBuffer=sBuffer..p - length=length-#p - break - end - else - res="" - end - else - local s,e,p=SOCK:receive(length) - if s then - ]]..(debugMode:find'R'and''or'--')..[[print(("<< %s(%d):%s"):format(threadName,length,#s<50 and s or s:sub(1,50))) - sBuffer=sBuffer..s - length=length-#s - elseif p then - ]]..(debugMode:find'R'and''or'--')..[[print(("<< %s(%d):%s"):format(threadName,length,#p<50 and p or p:sub(1,50))) - sBuffer=sBuffer..p - length=length-#p - end - if length==0 then - res,sBuffer=sBuffer,"" - UFF=false - else - break - end - end - ]]..(debugMode:find'R'and''or'--')..[[print(("<< %s[(%d)]:%s"):format(threadName,#res,#res<800 and res or res:sub(1,150).."\n...\n"..res:sub(-150))) - - --React - if op==8 then--8=close - CHN_push(readCHN,op) - SOCK:close() - if type(res)=='string'then - CHN_push(readCHN,res:sub(3))--Warning: with 2 bytes close code - else - CHN_push(readCHN,"WS Error") - end - elseif op==0 then--0=continue - lBuffer=lBuffer..res - if fin then - ]]..(debugMode:find'M'and''or'--')..[[print("FIN=1 (c") - CHN_push(readCHN,lBuffer) - lBuffer="" - else - ]]..(debugMode:find'M'and''or'--')..[[print("FIN=0 (c") - end - else - CHN_push(readCHN,op) - if fin then - ]]..(debugMode:find'M'and''or'--')..[[print("OP: "..op.."\tFIN=1") - CHN_push(readCHN,res) - else - ]]..(debugMode:find'M'and''or'--')..[[print("OP: "..op.."\tFIN=0") - sBuffer=res - ]]..(debugMode:find'M'and''or'--')..[[print("START pack: "..res) - end - end - end -end -]] - local type=type local timer=love.timer.getTime local CHN=love.thread.newChannel() @@ -257,7 +43,7 @@ end function WS.connect(name,subPath,body,timeout) local ws={ real=true, - thread=love.thread.newThread(wsThread), + thread=love.thread.newThread('Zframework/websocket_thread.lua'), triggerCHN=love.thread.newChannel(), sendCHN=love.thread.newChannel(), readCHN=love.thread.newChannel(), @@ -270,7 +56,7 @@ function WS.connect(name,subPath,body,timeout) pongTimer=0, } wsList[name]=ws - ws.thread:start(ws.triggerCHN,ws.sendCHN,ws.readCHN,name) + ws.thread:start(ws.triggerCHN,ws.sendCHN,ws.readCHN) CHN_push(ws.sendCHN,host) CHN_push(ws.sendCHN,port) CHN_push(ws.sendCHN,path..subPath) diff --git a/Zframework/websocket_thread.lua b/Zframework/websocket_thread.lua new file mode 100644 index 00000000..eb4a21da --- /dev/null +++ b/Zframework/websocket_thread.lua @@ -0,0 +1,202 @@ +local triggerCHN,sendCHN,readCHN=... + +local CHN_demand,CHN_getCount=triggerCHN.demand,triggerCHN.getCount +local CHN_push,CHN_pop=triggerCHN.push,triggerCHN.pop + +local SOCK=require'socket'.tcp() +local JSON=require'Zframework.json' + +do--Connect + local host=CHN_demand(sendCHN) + local port=CHN_demand(sendCHN) + local path=CHN_demand(sendCHN) + local body=CHN_demand(sendCHN) + local timeout=CHN_demand(sendCHN) + + SOCK:settimeout(timeout) + local res,err=SOCK:connect(host,port) + if err then CHN_push(readCHN,err)return end + + --WebSocket handshake + if not body then body=''end + SOCK:send( + 'GET '..path..' HTTP/1.1\r\n'.. + 'Host: '..host..':'..port..'\r\n'.. + 'Connection: Upgrade\r\n'.. + 'Upgrade: websocket\r\n'.. + 'Content-Type: application/json\r\n'.. + 'Content-Length: '..#body..'\r\n'.. + 'Sec-WebSocket-Version: 13\r\n'.. + 'Sec-WebSocket-Key: osT3F7mvlojIvf3/8uIsJQ==\r\n\r\n'..--secKey + body + ) + + --First line of HTTP + res,err=SOCK:receive('*l') + if not res then CHN_push(readCHN,err)return end + local code,ctLen + code=res:find(' ') + code=res:sub(code+1,code+3) + + --Get body length from headers and remove headers + repeat + res,err=SOCK:receive('*l') + if not res then CHN_push(readCHN,err)return end + if not ctLen and res:find('length')then + ctLen=tonumber(res:match('%d+')) + end + until res=='' + + --Result + if ctLen then + if code=='101'then + CHN_push(readCHN,'success') + else + res,err=SOCK:receive(ctLen) + if not res then + CHN_push(readCHN,err) + else + res=JSON.decode(res) + CHN_push(readCHN,(code or"XXX")..":"..(res and res.reason or"Server Error")) + end + return + end + end +end + +local byte=string.byte +local band,shl=bit.band,bit.lshift + +local _send do + local char=string.char + local bor,bxor=bit.bor,bit.bxor + local shr=bit.rshift + + local mask_key={1,14,5,14} + local mask_str=char(unpack(mask_key)) + + function _send(op,message) + --Message type + SOCK:send(char(bor(0x80,op))) + + if message then + --Length + local length=#message + if length>65535 then + SOCK:send(char(bor(127,0x80),0,0,0,0,band(shr(length,24),0xff),band(shr(length,16),0xff),band(shr(length,8),0xff),band(length,0xff))) + elseif length>125 then + SOCK:send(char(bor(126,0x80),band(shr(length,8),0xff),band(length,0xff))) + else + SOCK:send(char(bor(length,0x80))) + end + SOCK:send(mask_str) + local msgbyte={byte(message,1,length)} + for i=1,length do + msgbyte[i]=bxor(msgbyte[i],mask_key[(i-1)%4+1]) + end + return SOCK:send(char(unpack(msgbyte))) + else + SOCK:send('\128'..mask_str) + return 0 + end + end +end + +local res,err +local op,fin +local length +local lBuffer=""--Long multi-data buffer +local UFF--Un-finished-frame mode +local sBuffer=""--Short multi-frame buffer +while true do--Running + CHN_demand(triggerCHN) + + --Send + while CHN_getCount(sendCHN)>=2 do + op=CHN_pop(sendCHN) + local message=CHN_pop(sendCHN) + _send(op,message) + end + + --Read + while true do + if not UFF then--UNF process + --Byte 0-1 + res,err=SOCK:receive(2) + if err then break end + + op=band(byte(res,1),0x0f) + fin=band(byte(res,1),0x80)==0x80 + + --Calculating data length + length=band(byte(res,2),0x7f) + if length==126 then + res,err=SOCK:receive(2) + if res then + length=shl(byte(res,1),8)+byte(res,2)--!!!!! + else + CHN_push(readCHN,8)--close + CHN_push(readCHN,'{"reason":"'..(err or"error_01")..'"}') + end + elseif length==127 then + local b={byte(SOCK:receive(8),1,8)} + length=shl(b[5],24)+shl(b[6],16)+shl(b[7],8)+b[8] + end + + if length>0 then + --Receive data + local s,_,p=SOCK:receive(length) + if s then + res=s + elseif p then--UNF head + UFF=true + sBuffer=sBuffer..p + length=length-#p + break + end + else + res="" + end + else + local s,_,p=SOCK:receive(length) + if s then + sBuffer=sBuffer..s + length=length-#s + elseif p then + sBuffer=sBuffer..p + length=length-#p + end + if length==0 then + res,sBuffer=sBuffer,"" + UFF=false + else + break + end + end + + --React + if op==8 then--8=close + CHN_push(readCHN,op) + SOCK:close() + if type(res)=='string'then + CHN_push(readCHN,res:sub(3))--Warning: with 2 bytes close code + else + CHN_push(readCHN,"WS Error") + end + elseif op==0 then--0=continue + lBuffer=lBuffer..res + if fin then + CHN_push(readCHN,lBuffer) + lBuffer="" + else + end + else + CHN_push(readCHN,op) + if fin then + CHN_push(readCHN,res) + else + sBuffer=res + end + end + end +end \ No newline at end of file