TP5.0.24+Workerman+定时任务
1.安装 Workerman
安装GatewayWorker内核文件(不包含start_gateway.php start_businessworker.php等启动入口文件),直接上composer
composer require workerman/gateway-worker
2.创建 Workerman 启动文件
创建一个自定义命令类文件来启动 Socket 服务端,新建
application/push/command/Workerman.php
namespace app\push\command;use Workerman\Worker;use GatewayWorker\Register;use GatewayWorker\BusinessWorker;use GatewayWorker\Gateway;use think\console\Command;use think\console\Input;use think\console\input\Argument;use think\console\input\Option;use think\console\Output;class Workerman extends Command{ protected function configure() { $this->setName('workerman') ->addArgument('action', Argument::OPTIONAL, "action start|stop|restart") ->addArgument('type', Argument::OPTIONAL, "d -d") ->setDescription('workerman chat'); } protected function execute(Input $input, Output $output) { global $argv; $action = trim($input->getArgument('action')); $type = trim($input->getArgument('type')) ? '-d' : ''; $argv[0] = 'chat'; $argv[1] = $action; $argv[2] = $type ? '-d' : ''; $this->start(); } private function start() { $this->startGateWay(); $this->startBusinessWorker(); $this->startRegister(); Worker::runAll(); } private function startBusinessWorker() { // bussinessWorker 进程 $worker = new BusinessWorker(); // worker名称 $worker->name = 'YourAppBusinessWorker'; // bussinessWorker进程数量 $worker->count = 4; //设置处理业务的类,此处制定Events的命名空间 $worker->eventHandler= \app\push\controller\Events::class; // 服务注册地址 $worker->registerAddress = '127.0.0.1:1238'; } private function startGateWay() { // gateway 进程,这里使用Text协议,可以用telnet测试 $gateway = new Gateway("websocket://0.0.0.0:8282"); // gateway名称,status方便查看 $gateway->name = 'YourAppGateway'; // gateway进程数 $gateway->count = 4; // 本机ip,分布式部署时使用内网ip $gateway->lanIp = '127.0.0.1'; // 内部通讯起始端口,假如$gateway->count=4,起始端口为4000 // 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口 $gateway->startPort = 20003; // 服务注册地址 $gateway->registerAddress = '127.0.0.1:1238'; // 心跳间隔 $gateway->pingInterval = 55; $gateway->pingNotResponseLimit = 1; // 心跳数据 $gateway->pingData = ''; } private function startRegister() { new Register('text://0.0.0.0:1238'); }}
配置 application/command.php 文件
return [ 'app\common\command\Workerman',];
3.创建事件监听文件
创建 application/push/controller/Events.php 文件来监听处理 workerman 的各种事件。
<?phpnamespace app\push\controller;use GatewayWorker\Lib\Gateway;use think\Hook;use Workerman\Lib\Timer;class Events{ //定时器间隔 protected static $interval = 2; //定时器 protected static $timer = null; //事件处理类 protected static $evevtRunClass = \app\push\controller\EvevtRun::class; /* * 消息事件回调 class * * */ protected static $eventClassName = \app\push\controller\Push::class; /** * 当客户端发来消息时触发 * @param int $client_id 连接id * @param mixed $message 具体消息 */ public static function onMessage($client_id, $message) { $message_data = json_decode($message,true); if (!$message_data) return ; try{ if(!isset($message_data['type'])) throw new \Exception('缺少消息参数类型'); //消息回調处理 $evevtName = self::$eventClassName.'::instance'; if(is_callable($evevtName)) $evevtName()->start($message_data['type'],$client_id,$message_data); else throw new \Exception('消息处理回调不存在。['+$evevtName+']'); }catch (\Exception $e){ var_dump([ 'file'=>$e->getFile(), 'code'=>$e->getCode(), 'msg'=>$e->getMessage(), 'line'=>$e->getLine() ]); } } /** * 当用户连接时触发的方法 * @param integer $client_id 连接的客户端 * @return void */ public static function onConnect($client_id) { Gateway::sendToClient($client_id, json_encode(array( 'type' => 'init', 'client_id' => $client_id ))); } /** * 当用户断开连接时触发的方法 * @param integer $client_id 断开连接的客户端 * @return void */ public static function onClose($client_id) { Gateway::sendToClient($client_id,json_encode([ 'type'=>'logout', 'message'=>"client[$client_id]" ])); } /** * 当进程启动时 * @param integer $businessWorker 进程实例 */ public static function onWorkerStart($worker) { //在进程1上开启定时器 每self::$interval秒执行 if($worker->id === 0){ $last = time(); $task = [6 => $last, 10 => $last, 30 => $last, 60 => $last, 180 => $last, 300 => $last]; self::$timer = Timer::add(self::$interval, function() use(&$task) { try { $now = time(); Hook::exec(self::$evevtRunClass); foreach ($task as $sec => &$time) { if (($now - $time) >= $sec) { $time = $now; Hook::exec(self::$evevtRunClass,'task_'.$sec); } } } catch (\Throwable $e) {} }); } } /** * 当进程关闭时 * @param integer $businessWorker 进程实例 */ public static function onWorkerStop($worker) { if($worker->id === 0) Timer::del(self::$timer); }}
消息事件回调 class 方法里的处理根据自身情况编写
<?phpnamespace app\push\controller;use app\wap\model\live\LiveUser;use GatewayWorker\Lib\Gateway;use app\wap\model\live\LiveHonouredGuest;use app\wap\model\user\User;use app\wap\model\live\LiveBarrage;class Push{ /* * @var array 消息内容 * */ protected $message_data = [ 'type' => '', 'message'=>'', ]; /* * @var string 消息类型 * */ protected $message_type = ''; /* * @var string $client_id * */ protected $client_id = ''; /* * @var int 当前登陆用户 * */ protected $uid = null; /* * @var null 本类实例化结果 * */ protected static $instance = null; /* * * */ protected function __construct($message_data = []) { } /* * 实例化本类 * */ public static function instance() { if(is_null(self::$instance)) self::$instance = new static(); return self::$instance; } /* * 检测参数并返回 * @param array || string $keyValue 需要提取的键值 * @param null || bool $value * @return array; * */ protected function checkValue($keyValue = null,$value = null) { if(is_null($keyValue)) $message_data = $this->message_data; if(is_string($keyValue)) $message_data = isset($this->message_data[$keyValue]) ? $this->message_data[$keyValue] : (is_null($value) ? '': $value); if(is_array($keyValue)) $message_data = array_merge($keyValue,$this->message_data); if(is_bool($value) && $value === true && is_array($message_data) && is_array($keyValue)){ $newData = []; foreach ($keyValue as $key => $item){ $newData [] = $message_data[$key]; } return $newData; } return $message_data; } /* * 开始设置回调 * @param string $typeFnName 回调函数名 * @param string $client_id * @param array $message_data * * */ public function start($typeFnName,$client_id,$message_data) { $this->message_type = $typeFnName; $this->message_data = $message_data; $this->client_id = $client_id; $this->uid = Gateway::getUidByClientId($client_id); //记录用户上线 if($this->uid && Gateway::isOnline($client_id) && ($live_id = $this->checkValue('room'))) { LiveUser::setLiveUserOnline($live_id,$this->uid,1); } if(method_exists($this,$typeFnName)) call_user_func([$this,$typeFnName]); else throw new \Exception('缺少回调方法'); } /* * 心跳检测 * * */ protected function ping() { return ; } /* * 绑定用户相应客户端 * @param string $client_id * @param array $message_data * @return * */ protected function handshake() { $message_data = $this->checkValue(['uid'=>0,'room'=>0]); if(!$message_data['uid']) throw new \Exception("缺少用户uid,无法绑定用户"); $new_message = [ 'type' => $this->message_type, 'client_id' => $this->client_id, 'time' => date('H:i:s'), 'msg' => '绑定成功!' ]; Gateway::bindUid($this->client_id,$message_data['uid']); //如果有群组id加入群组 if($message_data['room']){ // 加入某个群组(可调用多次加入多个群组) 将clientid加入roomid分组中 Gateway::joinGroup($this->client_id, $message_data['room']); } Gateway::sendToClient($this->client_id, json_encode($new_message)); } /* * 接受客户端发送的消息 * @param string $client_id 客户端client_id * @param array $message_data 发送的数据 * @return * * */ protected function send() { list($toUid,$message,$room,$type) = $this->checkValue(['uid'=>0,'content'=>'','room'=>false,'ms_type' => 0],true); $client_id = $this->client_id; if(!$this->uid) { //认证用户信息失败,关闭用户链接 Gateway::closeClient($client_id); throw new \Exception("缺少用户uid"); } $userInfo = User::get($this->uid); if(!$userInfo){ //认证用户信息失败,关闭用户链接 Gateway::closeClient($client_id); throw new \Exception("用户信息缺少"); } if($room && Gateway::getClientIdCountByGroup($room)){ $user_type = LiveHonouredGuest::where(['uid'=>$this->uid,'live_id'=>$room])->value('type'); if(is_null($user_type)) $user_type = 2; $res = LiveBarrage::set([ 'live_id'=>$room, 'uid'=>$this->uid, 'type'=>$type, 'barrage'=>$message, 'add_time'=>time(), 'is_show'=>1 ]); if(!$res) throw new \Exception("写入历史记录失败"); Gateway::sendToGroup($room,json_encode([ 'message'=>$message, 'm_type'=>$type, 'type'=>'message', 'user_type'=>$user_type, 'userInfo'=>$userInfo, 'id'=>$res['id'] ])); }else{ $new_message = [ 'type' => 'reception', 'content' => $message, 'time' => date('H:i:s'), 'timestamp' => time(), ]; if(Gateway::isUidOnline($toUid)) return Gateway::sendToUid($toUid, json_encode($new_message)); } } /* * 消息撤回 * @param string $client_id * @param array $message_data * */ protected function recall() { list($id,$room) = $this->checkValue(['id'=>0,'room'=>''],true); if(!$id) throw new \Exception('缺少撤回消息的id'); if(!$room) throw new \Exception('缺少房间号'); if(LiveBarrage::del($id)){ Gateway::sendToGroup($room,json_encode([ 'type'=>'recall', 'id'=>$id ]),Gateway::getClientIdByUid($this->uid)); } }}
定时任务事件处理类 按照自身情况编写方法内逻辑
<?phpnamespace app\push\controller;use GatewayWorker\Lib\Gateway;/* * 定时任务 * * */class EvevtRun{ /* * 默认定时器执行事件 * */ public function run() { } /* * 每隔6秒执行 * */ public function task_6() { } /* * 每隔10秒执行 * */ public function task_10() { } /* * 每隔30秒执行 * */ public function task_30() { } /* * 每隔60秒执行 * */ public function task_60() { } /* * 每隔180秒执行 * */ public function task_180() { } /* * 每隔300秒执行 * */ public function task_300() { }}
4.启动 Workerman 服务端
以debug(调试)方式启动
以debug(调试)方式启动php think workerman start//以daemon(守护进程)方式启动php think workerman start d//停止php think workerman stop//重启php think workerman restart//平滑重启php think workerman reload//查看状态php think workerman status//当你看到如下结果的时候,workerman已经启动成功了。Workerman[chat] start in DEBUG mode----------------------- WORKERMAN -----------------------------Workerman version:3.5.11 PHP version:7.0.29------------------------ WORKERS -------------------------------user worker listen processes statustegic Gateway websocket://0.0.0.0:8282 4 [OK]tegic BusinessWorker none 1 [OK]tegic Register text://0.0.0.0:1236 4 [OK]----------------------------------------------------------------Press Ctrl+C to stop. Start success.
5.客户端连接使用
socket.ws.send()调用可发送消息,socket.onmessage 内是处理消息类型,即可实现长链接
(function (global) { var socketDebug = window.socketDebug == undefined ? false : window.socketDebug; var socket = { ws:null, connect:function () { var that= this; that.ws = new WebSocket("ws://"+document.domain+":"+window.workermanConfig.port);//这里如果使用127.0.0.1或者localhost会出现连接失败。当时为了方便以后的维护,这里在php的全局文件里定义了一个常量来定义ip,后来本地开发完提交到linux服务器环境之后发现链接失败!按照此行代码会有效连接~ that.ws.onopen = this.onopen; that.ws.onmessage = this.onmessage; that.ws.onclose = function(e) { socketDebug && console.log("连接关闭,定时重连"); that.connect(); }; that.ws.onerror = function(e) { socketDebug && console.log("出现错误"); }; }, onopen:function () { var joint = '{"type":"handshake","role":"user","uid":'+window.uid+',"room":'+window.room+'}'; socket.ws.send(joint); socket.heartCheck.start(); }, sendMsg:function(content,type,id){ socket.ws.send("{content:'"+content+"',m_type:'"+type+"',room:"+id+",type:'send'}") }, onmessage:function (e) { try { var data = JSON.parse(e.data); socketDebug && console.log(data) switch(data.type){ case 'init': break; // 服务端ping客户端 case 'ping': break; // 登录 更新用户列表 case 'handshake': break; // 提醒 case 'reception': break; //直播进行中 case 'live_ing': break; //直播结束 case 'live_end': break; //消息提醒 case 'message': break; //消息撤回 case 'recall': break; case 'ban': break; } }catch (e) { socketDebug && console.info(e); } }, heartCheck:{ timeout: 3000, timeoutObj: null, start: function(){ this.timeoutObj = setInterval(function(){ socket.ws.send("{'type':'ping'}"); }, this.timeout); } } }; window.onload=function () { socket.connect(); }; global.socket = socket; return socket}(this));
windows 版本无法启动,已经在商城项目中使用
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。