diff --git a/Zframework/websocket.lua b/Zframework/websocket.lua index 7e358638..be219b31 100644 --- a/Zframework/websocket.lua +++ b/Zframework/websocket.lua @@ -49,7 +49,7 @@ function WS.connect(name,subPath,body,timeout) readCHN=love.thread.newChannel(), lastPingTime=0, lastPongTime=timer(), - pingInterval=12, + pingInterval=6, status='connecting',--'connecting', 'running', 'dead' sendTimer=0, alertTimer=0, @@ -119,7 +119,11 @@ function WS.read(name) local ws=wsList[name] if ws.real and ws.status~='connecting'and CHN_getCount(ws.readCHN)>=2 then local op,message=CHN_pop(ws.readCHN),CHN_pop(ws.readCHN) - if op==8 then ws.status='dead'end--8=close + if op==8 then--8=close + ws.status='dead' + elseif op==9 then--9=ping + WS.send(name,message or"",'pong') + end ws.lastPongTime=timer() ws.pongTimer=1 return message,OPname[op]or op @@ -139,35 +143,41 @@ function WS.update(dt) local time=timer() for name,ws in next,wsList do if ws.real then - if CHN_getCount(ws.triggerCHN)==0 then - CHN_push(ws.triggerCHN,0) - end - if ws.status=='connecting'then - local mes=CHN_pop(ws.readCHN) - if mes then - if mes=='success'then - ws.status='running' + if ws.thread:isRunning()then + if CHN_getCount(ws.triggerCHN)==0 then + CHN_push(ws.triggerCHN,0) + end + if ws.status=='connecting'then + local mes=CHN_pop(ws.readCHN) + if mes then + if mes=='success'then + ws.status='running' + ws.lastPingTime=time + ws.lastPongTime=time + ws.pongTimer=1 + else + ws.status='dead' + MES.new('warn',text.wsFailed..": "..(mes=="timeout"and text.netTimeout or mes)) + end + end + elseif ws.status=='running'then + if time-ws.lastPingTime>ws.pingInterval then + CHN_push(ws.sendCHN,9) + CHN_push(ws.sendCHN,"")--ping ws.lastPingTime=time - ws.lastPongTime=time - ws.pongTimer=1 - else - ws.status='dead' - MES.new('warn',text.wsFailed..": "..(mes=="timeout"and text.netTimeout or mes)) + end + if time-ws.lastPongTime>6+2*ws.pingInterval then + WS.close(name) end end - elseif ws.status=='running'then - if time-ws.lastPingTime>ws.pingInterval then - CHN_push(ws.sendCHN,9) - CHN_push(ws.sendCHN,"")--ping - ws.lastPingTime=time - end - if time-ws.lastPongTime>6+2*ws.pingInterval then - WS.close(name) - end + if ws.sendTimer>0 then ws.sendTimer=ws.sendTimer-dt end + if ws.pongTimer>0 then ws.pongTimer=ws.pongTimer-dt end + if ws.alertTimer>0 then ws.alertTimer=ws.alertTimer-dt end + else + ws.status='dead' + ws.real=false + MES.new('warn',text.wsClose.."线程错误 Thread error") end - if ws.sendTimer>0 then ws.sendTimer=ws.sendTimer-dt end - if ws.pongTimer>0 then ws.pongTimer=ws.pongTimer-dt end - if ws.alertTimer>0 then ws.alertTimer=ws.alertTimer-dt end end end end diff --git a/Zframework/websocket_thread.lua b/Zframework/websocket_thread.lua index eb4a21da..ea80a5ce 100644 --- a/Zframework/websocket_thread.lua +++ b/Zframework/websocket_thread.lua @@ -64,139 +64,133 @@ do--Connect end end -local byte=string.byte -local band,shl=bit.band,bit.lshift +local YIELD=coroutine.yield +local byte,char=string.byte,string.char +local band,bor,bxor=bit.band,bit.bor,bit.bxor +local shl,shr=bit.lshift,bit.rshift -local _send do - local char=string.char - local bor,bxor=bit.bor,bit.bxor - local shr=bit.rshift +local mask_key={1,14,5,14} +local mask_str=char(unpack(mask_key)) +local function _send(op,message) + --Message type + SOCK:send(char(bor(op,0x80))) - local mask_key={1,14,5,14} - local mask_str=char(unpack(mask_key)) - - function _send(op,message) - --Message type - SOCK:send(char(bor(0x80,op))) - - if message then - --Length - local length=#message - if length>65535 then - SOCK:send(char(bor(127,0x80),0,0,0,0,band(shr(length,24),0xff),band(shr(length,16),0xff),band(shr(length,8),0xff),band(length,0xff))) - elseif length>125 then - SOCK:send(char(bor(126,0x80),band(shr(length,8),0xff),band(length,0xff))) - else - SOCK:send(char(bor(length,0x80))) - end - SOCK:send(mask_str) - local msgbyte={byte(message,1,length)} - for i=1,length do - msgbyte[i]=bxor(msgbyte[i],mask_key[(i-1)%4+1]) - end - return SOCK:send(char(unpack(msgbyte))) + if message then + --Length + local length=#message + if length>65535 then + SOCK:send(char(bor(127,0x80),0,0,0,0,band(shr(length,24),0xff),band(shr(length,16),0xff),band(shr(length,8),0xff),band(length,0xff))) + elseif length>125 then + SOCK:send(char(bor(126,0x80),band(shr(length,8),0xff),band(length,0xff))) else - SOCK:send('\128'..mask_str) - return 0 + SOCK:send(char(bor(length,0x80))) end + local msgbyte={byte(message,1,length)} + for i=1,length do + msgbyte[i]=bxor(msgbyte[i],mask_key[(i-1)%4+1]) + end + return SOCK:send(mask_str..char(unpack(msgbyte))) + else + SOCK:send('\128'..mask_str) + return 0 end end - -local res,err -local op,fin -local length -local lBuffer=""--Long multi-data buffer -local UFF--Un-finished-frame mode -local sBuffer=""--Short multi-frame buffer -while true do--Running - CHN_demand(triggerCHN) - - --Send - while CHN_getCount(sendCHN)>=2 do - op=CHN_pop(sendCHN) - local message=CHN_pop(sendCHN) - _send(op,message) - end - - --Read +local sendThread=coroutine.wrap(function() while true do - if not UFF then--UNF process - --Byte 0-1 - res,err=SOCK:receive(2) - if err then break end - - op=band(byte(res,1),0x0f) - fin=band(byte(res,1),0x80)==0x80 - - --Calculating data length - length=band(byte(res,2),0x7f) - if length==126 then - res,err=SOCK:receive(2) - if res then - length=shl(byte(res,1),8)+byte(res,2)--!!!!! - else - CHN_push(readCHN,8)--close - CHN_push(readCHN,'{"reason":"'..(err or"error_01")..'"}') - end - elseif length==127 then - local b={byte(SOCK:receive(8),1,8)} - length=shl(b[5],24)+shl(b[6],16)+shl(b[7],8)+b[8] - end - - if length>0 then - --Receive data - local s,_,p=SOCK:receive(length) - if s then - res=s - elseif p then--UNF head - UFF=true - sBuffer=sBuffer..p - length=length-#p - break - end - else - res="" - end - else - local s,_,p=SOCK:receive(length) - if s then - sBuffer=sBuffer..s - length=length-#s - elseif p then - sBuffer=sBuffer..p - length=length-#p - end - if length==0 then - res,sBuffer=sBuffer,"" - UFF=false - else - break - end + while CHN_getCount(sendCHN)>=2 do + _send(CHN_pop(sendCHN),CHN_pop(sendCHN)) end + YIELD() + end + error("break") +end) + +local function _receive(sock,len) + local buffer="" + while true do + local r,e,p=sock:receive(len) + if r then + buffer=buffer..r + len=len-#r + elseif p then + buffer=buffer..p + len=len-#p + elseif e then + return nil,e + end + if len==0 then + return buffer + end + YIELD() + end +end +local readThread=coroutine.wrap(function() + local res,err + local op,fin + local lBuffer=""--Long multi-pack buffer + while true do + --Byte 0-1 + res,err=_receive(SOCK,2) + assert(not err,err) + + op=band(byte(res,1),0x0f) + fin=band(byte(res,1),0x80)==0x80 + + --Calculating data length + local length=band(byte(res,2),0x7f) + if length==126 then + res,err=_receive(SOCK,2) + assert(not err,err) + length=shl(byte(res,1),8)+byte(res,2) + elseif length==127 then + local lenData + lenData,err=_receive(SOCK,8) + assert(not err,err) + local _,_,_,_,_5,_6,_7,_8=byte(lenData,1,8) + length=shl(_5,24)+shl(_6,16)+shl(_7,8)+_8 + end + res,err=_receive(SOCK,length) + assert(not err,err) --React if op==8 then--8=close - CHN_push(readCHN,op) - SOCK:close() + CHN_push(readCHN,8)--close if type(res)=='string'then - CHN_push(readCHN,res:sub(3))--Warning: with 2 bytes close code + CHN_push(readCHN,res:sub(3))--Warning: 2 bytes close code at start so :sub(3) else - CHN_push(readCHN,"WS Error") + CHN_push(readCHN,"WS closed") end + return elseif op==0 then--0=continue lBuffer=lBuffer..res if fin then CHN_push(readCHN,lBuffer) lBuffer="" - else end else CHN_push(readCHN,op) if fin then CHN_push(readCHN,res) + lBuffer="" else - sBuffer=res + lBuffer=res end end + YIELD() end -end \ No newline at end of file + error("break") +end) + +local success,err + +while true do--Running + CHN_demand(triggerCHN) + success,err=pcall(sendThread) + if not success or err then break end + success,err=pcall(readThread) + if not success or err then break end +end + +SOCK:close() +CHN_push(readCHN,8)--close +CHN_push(readCHN,err or"Disconnected") \ No newline at end of file diff --git a/parts/net.lua b/parts/net.lua index 413258e6..12ca03ce 100644 --- a/parts/net.lua +++ b/parts/net.lua @@ -63,11 +63,6 @@ function NET.getlock(name) return TIME()