์ต๊ทผ ์งํํ ๊ฒ ์ค์ ์ ์ฉํ๊ฒ ์ ์ฌ์ฉํ paho-mqtt option์ด ์์ด์ ๊ธฐ๋กํ๋ค.
์๋์ ๊ฐ์ mqtt ๋ฅผ ๊ฐ๋ฐํ๊ณ ์์๋ค
mqtt ํต์ ์ ์ ๋ขฐ์ฑ์ ๋์ด๊ธฐ ์ํด์ main client ์์ sub client ์ ์ฐ๊ฒฐ ์ํ(online/offline)๋ฅผ ์ค์๊ฐ์ผ๋ก ํ์ธํ๊ณ ์ถ์๋ค.
๊ทธ๋์ qos, retain์ ์ค์ ํ๊ณ , will_set๋ ๋ฑ๋กํ๋ค
์๋ ์์๋ฅผ ๋ณด๋ฉฐ ์ด๋ป๊ฒ ์ฌ์ฉํ๋์ง ํ์ธํด๋ณด์!
โจ ์ฌ๊ธฐ์ ์๋ก ์๊ฒ๋ ์ฉ์ด : presence detection
→ IoT, MQTT, ์ฑํ ์์คํ ์ ๋ง์ด ์ฐ์ด๋ ๊ฐ๋ ์ด๋ค.
→ ํ์ฌ ๋๊ฐ ์ฐ๊ฒฐ๋์๋์ง(online), ๋๊ฐ ์ฐ๊ฒฐ์ด ๋์ด์ก๋์ง(offline)์ ๋ํ ์ํ๋ฅผ ์์คํ ์ด ์ค์๊ฐ์ผ๋ก ํ์ ํ๋ ๊ธฐ๋ฅ์ด๋ค.
→ "ํด๋ผ์ด์ธํธ์ ์ ์ ์ํ ๊ฐ์ง" ๋ผ๊ณ ์๊ฐํ๋ฉด ๋ ๋ฏ !
์ฐ์ , sub client์์๋ {"client1": "online"} ์ด๋ฐ ํ์์ผ๋ก publish ํ ์์ !!
Main Client ์ธก ์คํฌ๋ฆฝํธ์๋ ์ด๋ ๊ฒ ์ถ๊ฐํ๋ค.
self.status_map = {}
def on_connect(self, client, userdata, flags, rc):
...
self.subscribe("status/#")
logger.info("[MQTT] Subscribe 'agents/status'!")
...
def on_message(self, client, userdata, msg):
...
if msg.topic.startswith('status'):
self._handle_status_update(message)
...
def _handle_status_update(self, message):
agent = message.get("agent")
status = message.get("status")
if agent and status:
self.status_map[agent] = status
logger.info(f"[STATUS] {agent} → {status}")
logger.info(f'[STATUS_MAP] status_map: {self.status_map}')
def get_agent_status(self, agent):
return self.status_map.get(agent, "unknown")
def is_agent_online(self, agent):
return self.get_agent_status(agent) == "online"
(1) status_map : ์ฐ๊ฒฐ๋ agents ์ํ๋ฅผ ๋ด์ dictionary
(2) on_connect()
- mqtt ์ฐ๊ฒฐ ์๋ฃ ํ "status/#" ๊ตฌ๋ ์์
- "status/#" : "status/" ์ผ๋ก ์์ํ๋ ๋ชจ๋ topic์ ๊ตฌ๋
(3) on_message()
- message๋ฅผ ๋ฐ์ผ๋ฉด _handle_status_update ์คํ
(4) _handle_status_update()
- status_map์ด๋ผ๋ dict์ {“agent1”: “online”, “agent2”: “offline”} ํ์์ผ๋ก ์ํ๋ฅผ ์ ์ฅ
(5) is_agent_online()
- ๋์ค์ agent ์ฐ๊ฒฐ ํ์ธํ ๋ ์ฌ์ฉํ ํจ์
sub client ์ธก์๋ online/offline ์ publish ํ๋๋ก ์ถ๊ฐํ๋ค.
1๏ธโฃ connect ๋๋ฉด online ์ ์ก
def on_connect(self, client, userdata, flags, rc):
...
online_message = {self.client_id: "online"}
self.client.publish("agents/status", json.dumps(online_message), qos=1, retain=True)
2๏ธโฃ ์ ์์ ์ธ disconnect ์์ offline ์ ์ก
def on_disconnect(self, client, userdata, flags, rc):
offline_message = {self.client_id: "offline"}
self.client.publish("agents/status", json.dumps(offline_message), qos=1, retain=True)
3๏ธโฃ ๋น์ ์์ ์ธ disconnect ์์ offline ์ ์ก
client.connect(broker_host, broker_port)
will_message = {client_id: "offline"}
client.will_set("agents/status", payload=json.dumps(will_message), qos=1, retain=True)
* will_set : client๊ฐ ๋น์ ์์ ์ผ๋ก disconnect ๋์์ ๋ broker๊ฐ ๋์ publishํ๋ message
โญ๏ธ qos=1, retain=True
- ์ค์ ์ ์ถ๊ฐํ ์ด์
- sub client๊ฐ ๋จผ์ mqtt ์ฐ๊ฒฐ๋๊ณ ๋์ Main Client๊ฐ "status/#" ๊ตฌ๋ ์ ํ๊ฒ๋๋ฉด, sub client๋ค์ ์ํ๋ฅผ ๋ฐ์ ์ ์๋ค.
- retain=True
- broker์ ๋ง์ง๋ง ๋ฉ์์ง๋ฅผ ์ ์ฅ(retain) ํ๋ผ๊ณ ์๋ ค์ฃผ๋ flag
- ์๋ก์ด subscriber๊ฐ ๊ตฌ๋ ํ๋ฉด broker๋ ์ ์ฅ๋ ๋ฉ์์ง๋ฅผ ์ฆ์ ์ ๋ฌํ๋ค.
- qos=1
- ๋ฉ์์ง๊ฐ ์ต์ 1ํ ์ด์ ์ ๋ฌ๋จ (at least once)
- publish → broker → subscriber ๊ฐ ACK(ํ์ธ ์๋ต) ์ ์ฃผ๊ณ ๋ฐ์
- ๋ฉ์์ง๋ฅผ ์ ์ฅํ๋ ๊ณผ์ (publish → broker)์ ์คํจํ์ง ์๊ฒ ํ๊ณ ์ถ๋ค๋ฉด qos=1 ์ด์ ์ฐ๋๊ฒ ์ ๋ฆฌ.
- ๋คํธ์ํฌ๊ฐ ์์ ์ ์ด๋ฉด qos=0 ์ผ๋ก ํด๋ ๊ด์ฐฎ์ผ๋, presence detection(ํด๋ผ์ด์ธํธ์ ์ ์ ์ํ ๊ฐ์ง, online/offline) ์์๋ ์ค์๊ฐ์ฑ์ด ์ค์ํ๊ณ ์ต์ ์ ๋ณด๊ฐ broker์ ์ ์ ์ฅ๋์ด์ผ ํ๋๊น qos=1 ์ผ๋ก ์ค์
- qos=2๋ ๋๋ฌด ๋ฌด๊ฒ๋ค
'๐ฉ๐ปโ๐ป > mqtt' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[MQTT] Keep Alive (0) | 2025.03.12 |
---|---|
[MQTT] pc 2๊ฐ ํต์ (python, ์๋ฐฉํฅ ํต์ ) (0) | 2025.02.25 |
[MQTT] Mac mosquitto ์ค์น / ์คํ / ์ค์ง (0) | 2025.02.25 |
[MQTT] python ์ผ๋ก sub / pub ๊ตฌํํ๊ธฐ (1) | 2024.10.28 |
MQTT์ Kakfa ๋น๊ต (2) | 2024.10.23 |