一、背景
在最近的項目中的一個需求是消息實時推送消息以及通知功能,項目使用django寫的所以決定采用django-channels來實現websocket進行實時通訊。目前官方已經更新到2.1版本,相對于老的channels 1.x版本有了很大變化,無論是使用方式還是功能,其中最大的變化莫過于2.x版本中帶來的asyncio特性,可使用異步處理模式。本文內容將介紹channels2版本使用,由于項目django是1.11,其中也遇到了一些坑,比如在channels在處理一次請求后hang住然后報錯,后面修改了下django1.11版本的一點源碼得以解決,2.0版本應該不會有問題。
二、channels介紹
channels是以django插件的形式存在,它不僅能處理http請求,還提供對websocket、MQTT等長連接支持。不僅如此,channels在保留了原生django的同步和易用的特性上還帶來了異步處理方式(channels2.X版本),并且將django自帶的認證系統以及session集成到模塊中,擴展性非常強。官方文檔:https://channels.readthedocs.io/en/latest/index.html
三、安裝以及安裝需求
channels2.0最低django版本要求是1.11+,python3.5+。筆者的版本是django1.11,直接安裝可能有問題,以下是測試通過的版本。
筆者的相關版本如下:
Django==1.11.10 channels==2.1.4 channels-redis==2.3.1 asgiref==2.1.6 asgi-redis==1.4.3
如果django版本比較高直接采用pip安裝:
pip3 install channels pip3 install channels-redis #可選的,官方推薦如果使用redis作為channel layer
redis安裝可以參考博客:https://www.gxlcms.com/article/151522.htm
四、開始使用
一、配置settings.py
筆者采用的redis作為channel layer(關于其介紹請移步至https://channels.readthedocs.io/en/latest/topics/channel_layers.html),它是實現消息推送的核心,在項目的settings.py中:
注冊channles app:
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'cmdb', 'channels', #注冊app ]
配置channels layer:
ASGI_APPLICATION = 'devops.routing.application' CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [('10.1.210.33', 6379)], #需修改 }, }, }
二、路由配置
在項目settings文件同級目錄中新增routing.py
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import deploy.routing application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter( deploy.routing.websocket_urlpatterns# 指明路由文件是devops/routing.py ) ), })
最后在app里配置路由和對應的消費者,筆者這里是devops下的routing.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from django.conf.urls import url from . import consumers websocket_urlpatterns = [ url(r'^ws/deploy/(?P<service_name>[^/]+)/$', consumers.DeployResult), #consumers.DeployResult 是該路由的消費者 ]
項目目錄結構如下:
三、編寫webscoket消息處理方法(消費者)
首先說明,消費者是Channels代碼的基本單元,當一個新的Socket進入的時候,Channels會根據路由表找到正確的消費者,以下代碼中每個方法都可以看作一個消費者,他們消費不同的event,比如剛剛接受連接時候connect方法進行消費處理并接受連接,關閉websocket時候使用disconnect進行消費處理。
deploy/consumers.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from channels.generic.websocket import AsyncWebsocketConsumer import json class DeployResult(AsyncWebsocketConsumer): async def connect(self): self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"] self.chat_group_name = 'chat_%s' % self.service_uid # 收到連接時候處理, await self.channel_layer.group_add( self.chat_group_name, self.channel_name ) await self.accept() async def disconnect(self, close_code): # 關閉channel時候處理 await self.channel_layer.group_discard( self.chat_group_name, self.channel_name ) # 收到消息 async def receive(self, text_data): text_data_json = json.loads(text_data) message = text_data_json['message'] print("收到消息--》",message) # 發送消息到組 await self.channel_layer.group_send( self.chat_group_name, { 'type': 'client.message', 'message': message } ) # 處理客戶端發來的消息 async def client_message(self, event): message = event['message'] print("發送消息。。",message) # 發送消息到 WebSocket await self.send(text_data=json.dumps({ 'message': message }))
以上代碼部分說明:
1.self.scope是單個連接傳入的詳細信息,其中包含了請求的session、以及django認證系統中的用戶信息等;
2.async...await 是python3.5之后的新異步特性,基于asyncio模塊;
四、發起webscoket請求
利用js發起websocket請求
function InitWebSocket() { var websocket = new WebSocket( 'ws://' + window.location.host + '/ws/deploy/tasks/' ); websocket.onmessage = function (e) { var data = JSON.parse(e.data); var message = '\n' + data['message']; document.querySelector('#deploy-res').innerText += (message + '\n'); }; }
五、發送消息到channel
無論是消息的推送或者消息的接受,都是經過channel layer進行傳輸,以下是發送消息示例,
from channels.layers import get_channel_layer from asgiref.sync import async_to_sync channel_layer = get_channel_layer() def send_channel_msg(channel_name, msg): """ send msg to channel :param channel_name: :param msg: :return: """ async_to_sync(channel_layer.group_send)(channel_name, {"type": "deploy.run", "text": msg})
六、生產部署
大多數django的應用部署方式都采用的是nginx+uwsgi進行部署,當django集成channels時候,由于uwsgi不能處理websocket請求,所以我們需要asgi服務器來處理websocket請求,官方推薦使用daphne。下一篇文章將介紹nginx+supervisor+daphne+uwsgi進行生產部署。
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com