Blame view

src/core/pyros_django/api/consumers.py 5.36 KB
6f83043c   Alexis Koralewski   Add websocket for...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# consumers.py
from django.contrib.auth.models import User
from .serializers import AgentCmdSerializer, AgentSurveySerializer
from common.models import AgentCmd, AgentSurvey
from asgiref.sync import async_to_sync
import json
from channels.db import database_sync_to_async
from .functions import get_agent_survey_instance, get_list_agent_cmd

from channels.generic.websocket import AsyncWebsocketConsumer


class AgentCmdConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.agent_name = self.scope["url_route"]["kwargs"]["agent_name"]
        # Join room group
        await self.channel_layer.group_add(f'agentcmd_{self.agent_name}_observers', self.channel_name)
        await self.accept()
        # try:
        #     agent_cmds = await database_sync_to_async(get_list_agent_cmd)(self.agent_name)
        # except Exception as e:
        #     print(e)
        await self.channel_layer.group_send(
            f'agentcmd_{self.agent_name}_observers', {"type": "send_agentcmd_instance", "data": None,"action":"list"}
        )

    def get_agentcmd(self,id):
        return AgentCmdSerializer(AgentCmd.objects.get(id=id)).data

    # async def disconnect(self, close_code):
    #     # Leave room group
    #     await self.channel_layer.group_discard(f'agentcmd_{self.agent_name}_observers', self.channel_name)
    #     await self.close(close_code)

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]

        # Send message to room group
        await self.channel_layer.group_send(
            f'agentcmd_{self.agent_name}_observers', {"type": "new_agentcmd_instance", "agent_name": self.agent_name,"action":"create"}
        )

    # ON Receive, send data to observers
    async def send_agentcmd_instance(self, event):
        data = event.get("data",None)
        action = event["action"]
        if action == "list":
            data = await database_sync_to_async(get_list_agent_cmd)(self.agent_name)
        elif action == "create" or action == "update":
            id = data
            data = await database_sync_to_async(self.get_agentcmd)(id)
        # Send message to WebSocket
        await self.send(text_data=json.dumps({"data": data,"action":action}))



class AgentSurveyDetailConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.agent_name = self.scope["url_route"]["kwargs"].get("agent_name")
        print(self.agent_name)
        # Join room group
        await self.channel_layer.group_add(f'agentsurvey_{self.agent_name}_observers', self.channel_name)
        await self.accept()
        # try:
        #     agent_survey = await database_sync_to_async(self.get_agent_survey_instance)()
        # except Exception as e:
        #     print("exception websocket : ",e)
        await self.channel_layer.group_send(
            f'agentsurvey_{self.agent_name}_observers', {"type": "send_agentsurvey_instance", "data": None}
        )
        
    def get_agent_survey_instance(self):
        agent_survey = AgentSurvey.objects.get(name=self.agent_name)
        return AgentSurveySerializer(agent_survey).data

    # async def disconnect(self, close_code):
    #     # Leave room group
    #     await self.channel_layer.group_discard(f'agentsurvey_{self.agent_name}_observers', self.channel_name)
    #     await self.close(close_code)

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]

        # Send message to room group
        await self.channel_layer.group_send(
            f'agentsurvey_{self.agent_name}_observers', {"type": "new_agentsurvey_instance", "agent_name": self.agent_name,"action":"create"}
        )

    # Receive message from room group
    async def send_agentsurvey_instance(self, event):
        data = await database_sync_to_async(self.get_agent_survey_instance)()
        # Send message to WebSocket
        await self.send(text_data=json.dumps({"data": data}))


class AgentSurveyConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        # Join room group
        await self.channel_layer.group_add(f'agentsurvey_observers', self.channel_name)
        await self.accept()
        # try:
        #     agent_survey = await database_sync_to_async(get_agent_survey_instance)()
        # except Exception as e:
        #     print(e)
        await self.channel_layer.group_send(
            f'agentsurvey_observers', {"type": "send_agentsurvey_instance", "data": None}
        )
        

    # async def disconnect(self, close_code):
    #     # Leave room group
    #     await self.channel_layer.group_discard(f'agentsurvey_observers', self.channel_name)
    #     await self.close(close_code)

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]

        # Send message to room group
        await self.channel_layer.group_send(
            f'agentsurvey_observers', {"type": "new_agentsurvey_instance", "agent_name": self.agent_name,"action":"create"}
        )

    # Receive message from room group
    async def send_agentsurvey_instance(self, event):
        data = await database_sync_to_async(get_agent_survey_instance)()
        # Send message to WebSocket
        await self.send(text_data=json.dumps({"data": data}))