consumers.py
5.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# consumers.py
from django.contrib.auth.models import User
from .serializers import AgentCmdSerializer, AgentSurveySerializer
from common.models import AgentCmd, AgentSurvey
from asgiref.sync import async_to_sync
import json
from channels.db import database_sync_to_async
from .functions import get_agent_survey_instance, get_list_agent_cmd
from channels.generic.websocket import AsyncWebsocketConsumer
class AgentCmdConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.agent_name = self.scope["url_route"]["kwargs"]["agent_name"]
# Join room group
await self.channel_layer.group_add(f'agentcmd_{self.agent_name}_observers', self.channel_name)
await self.accept()
# try:
# agent_cmds = await database_sync_to_async(get_list_agent_cmd)(self.agent_name)
# except Exception as e:
# print(e)
await self.channel_layer.group_send(
f'agentcmd_{self.agent_name}_observers', {"type": "send_agentcmd_instance", "data": None,"action":"list"}
)
def get_agentcmd(self,id):
return AgentCmdSerializer(AgentCmd.objects.get(id=id)).data
# async def disconnect(self, close_code):
# # Leave room group
# await self.channel_layer.group_discard(f'agentcmd_{self.agent_name}_observers', self.channel_name)
# await self.close(close_code)
# Receive message from WebSocket
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json["message"]
# Send message to room group
await self.channel_layer.group_send(
f'agentcmd_{self.agent_name}_observers', {"type": "new_agentcmd_instance", "agent_name": self.agent_name,"action":"create"}
)
# ON Receive, send data to observers
async def send_agentcmd_instance(self, event):
data = event.get("data",None)
action = event["action"]
if action == "list":
data = await database_sync_to_async(get_list_agent_cmd)(self.agent_name)
elif action == "create" or action == "update":
id = data
data = await database_sync_to_async(self.get_agentcmd)(id)
# Send message to WebSocket
await self.send(text_data=json.dumps({"data": data,"action":action}))
class AgentSurveyDetailConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.agent_name = self.scope["url_route"]["kwargs"].get("agent_name")
print(self.agent_name)
# Join room group
await self.channel_layer.group_add(f'agentsurvey_{self.agent_name}_observers', self.channel_name)
await self.accept()
# try:
# agent_survey = await database_sync_to_async(self.get_agent_survey_instance)()
# except Exception as e:
# print("exception websocket : ",e)
await self.channel_layer.group_send(
f'agentsurvey_{self.agent_name}_observers', {"type": "send_agentsurvey_instance", "data": None}
)
def get_agent_survey_instance(self):
agent_survey = AgentSurvey.objects.get(name=self.agent_name)
return AgentSurveySerializer(agent_survey).data
# async def disconnect(self, close_code):
# # Leave room group
# await self.channel_layer.group_discard(f'agentsurvey_{self.agent_name}_observers', self.channel_name)
# await self.close(close_code)
# Receive message from WebSocket
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json["message"]
# Send message to room group
await self.channel_layer.group_send(
f'agentsurvey_{self.agent_name}_observers', {"type": "new_agentsurvey_instance", "agent_name": self.agent_name,"action":"create"}
)
# Receive message from room group
async def send_agentsurvey_instance(self, event):
data = await database_sync_to_async(self.get_agent_survey_instance)()
# Send message to WebSocket
await self.send(text_data=json.dumps({"data": data}))
class AgentSurveyConsumer(AsyncWebsocketConsumer):
async def connect(self):
# Join room group
await self.channel_layer.group_add(f'agentsurvey_observers', self.channel_name)
await self.accept()
# try:
# agent_survey = await database_sync_to_async(get_agent_survey_instance)()
# except Exception as e:
# print(e)
await self.channel_layer.group_send(
f'agentsurvey_observers', {"type": "send_agentsurvey_instance", "data": None}
)
# async def disconnect(self, close_code):
# # Leave room group
# await self.channel_layer.group_discard(f'agentsurvey_observers', self.channel_name)
# await self.close(close_code)
# Receive message from WebSocket
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json["message"]
# Send message to room group
await self.channel_layer.group_send(
f'agentsurvey_observers', {"type": "new_agentsurvey_instance", "agent_name": self.agent_name,"action":"create"}
)
# Receive message from room group
async def send_agentsurvey_instance(self, event):
data = await database_sync_to_async(get_agent_survey_instance)()
# Send message to WebSocket
await self.send(text_data=json.dumps({"data": data}))