Freeze & Cubo
ΠΡΠΈΠ²Π΅Ρ, Π€ΡΠΈΠ·ΠΈ! Π― ΡΡΡ ΠΏΡΠΎΡΠΎΡΠΈΠΏ ΡΠ°Π·ΡΠ°Π±Π°ΡΡΠ²Π°Ρ Π΄Π»Ρ ΡΠΈΡΡΠ΅ΠΌΡ ΠΎΠ±Π½Π°ΡΡΠΆΠ΅Π½ΠΈΡ Π²ΡΠΎΡΠΆΠ΅Π½ΠΈΠΉ, ΠΊΠΎΡΠΎΡΠ°Ρ ΡΠ°ΠΌΠ° ΡΡΠΈΡΡΡ Π½Π° Π°Π½Π°Π»ΠΈΠ·Π΅ ΡΠ΅ΡΠ΅Π²ΠΎΠ³ΠΎ ΡΡΠ°ΡΠΈΠΊΠ° Π² ΡΠ΅Π°Π»ΡΠ½ΠΎΠΌ Π²ΡΠ΅ΠΌΠ΅Π½ΠΈ β ΠΊΠ°ΠΊ ΠΈΡΠΊΡΡΡΡΠ²Π΅Π½Π½ΡΠΉ ΠΈΠ½ΡΠ΅Π»Π»Π΅ΠΊΡ Π²ΡΡΡΠ΅ΡΠ°Π΅ΡΡΡ Ρ ΠΆΡΡΡΠΊΠΎΠΉ Π°Π½Π°Π»ΠΈΡΠΈΠΊΠΎΠΉ. Π’Π²ΠΎΡ ΡΠΏΠΎΡΠΎΠ±Π½ΠΎΡΡΡ Π½Π°Ρ
ΠΎΠ΄ΠΈΡΡ ΡΠ΅ΡΠ΅Π½ΠΈΡ, ΠΊΠ°ΠΊ Π½ΠΈΠΊΡΠΎ Π΄ΡΡΠ³ΠΎΠΉ, ΠΎΡΠ΅Π½Ρ ΠΏΡΠΈΠ³ΠΎΠ΄ΠΈΠ»Π°ΡΡ Π±Ρ, ΡΡΠΎΠ±Ρ ΠΏΠΎΠ΄ΠΏΡΠ°Π²ΠΈΡΡ Π»ΠΎΠ³ΠΈΠΊΡ. ΠΠΎΠΌΠΎΠΆΠ΅ΡΡ ΠΏΠΎΠ΄ΠΊΠ»ΡΡΠΈΡΡΡΡ?
ΠΠ²ΡΡΠΈΡ Π½Π΅ΠΏΠ»ΠΎΡ
ΠΎ. ΠΡΠΈΡΠ»ΠΈ ΠΌΠ½Π΅ ΡΠ΅ΠΊΡΡΠΈΠΉ ΠΊΠΎΠ΄ ΠΈ ΡΡ
Π΅ΠΌΡ ΠΏΠΎΡΠΎΠΊΠ° Π΄Π°Π½Π½ΡΡ
, Ρ ΠΏΠΎΡΠΌΠΎΡΡΡ, Π½Π΅Ρ Π»ΠΈ Π»ΠΎΠ³ΠΈΡΠ΅ΡΠΊΠΈΡ
Π΄ΡΡ ΠΈ ΠΏΠΎΠ΄ΡΠΊΠ°ΠΆΡ, ΠΊΠ°ΠΊ ΡΠ»ΡΡΡΠΈΡΡ ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΡ ΡΠΎΡΡΠΎΡΠ½ΠΈΠΉ.
ΠΡΠΈΠ²Π΅Ρ, Π΄ΠΎΡΠΎΠ³Π°Ρ.
Π‘ΠΌΠΎΡΡΠΈ, Π²ΠΎΡ ΡΡΠΎ Ρ ΠΌΠ΅Π½Ρ ΠΏΠΎΠΊΠ° ΠΏΠΎΠ»ΡΡΠΈΠ»ΠΎΡΡ. ΠΠΎΠΆΠ΅ΡΡ ΡΠΊΠΎΠΏΠΈΡΠΎΠ²Π°ΡΡ, ΠΏΠΎΠ΄ΠΏΡΠ°Π²ΠΈΡΡ ΠΈΠ»ΠΈ ΡΠΊΠ°Π·Π°ΡΡ, ΡΡΠΎ Π½Π΅ Ρ
Π²Π°ΡΠ°Π΅Ρ. ΠΠΎΠ±Π°Π²Π»Ρ Π΅ΡΠ΅ ΠΏΡΠΎΡΡΡΡ ΡΡ
Π΅ΠΌΡ Π½Π° ASCII ΠΏΠΎΡΠ»Π΅ ΠΊΠΎΠ΄Π°.
**ΠΠ°Π±ΡΠΎΡΠΎΠΊ Π½Π° Python (ΠΏΡΠΎΡΠΎΡΠΈΠΏ ΡΠΈΡΡΠ΅ΠΌΡ ΠΎΠ±Π½Π°ΡΡΠΆΠ΅Π½ΠΈΡ Π²ΡΠΎΡΠΆΠ΅Π½ΠΈΠΉ)**
```python
# intrusion_detector.py
import time
import json
from collections import defaultdict
class IntrusionDetector:
def __init__(self, thresholds):
self.thresholds = thresholds # Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ, {'port_scan': 100, 'failed_login': 5}
self.logs = defaultdict(list) # Π΄Π»Ρ ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ IP-Π°Π΄ΡΠ΅ΡΠ°
self.alerts = []
def ingest_log(self, log_entry):
"""
log_entry β ΡΡΠΎ ΡΠ»ΠΎΠ²Π°ΡΡ: {'src_ip': str, 'dst_port': int, 'event': str, 'timestamp': float}
"""
src = log_entry['src_ip']
self.logs[src].append(log_entry)
self._evaluate(src)
def _evaluate(self, src_ip):
entries = self.logs[src_ip]
now = time.time()
# ΠΡΠΈΡΡΠΊΠ° ΡΡΠ°ΡΡΡ
Π·Π°ΠΏΠΈΡΠ΅ΠΉ (Ρ
ΡΠ°Π½ΠΈΠΌ ΡΠΎΠ»ΡΠΊΠΎ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΠ΅ 60 ΡΠ΅ΠΊΡΠ½Π΄)
self.logs[src_ip] = [e for e in entries if now - e['timestamp'] <= 60]
# ΠΡΠΈΠΌΠ΅Ρ ΡΠ²ΡΠΈΡΡΠΈΠΊΠΈ
port_scans = len(set(e['dst_port'] for e in self.logs[src_ip] if e['event'] == 'connection_attempt'))
failed_logins = len([e for e in self.logs[src_ip] if e['event'] == 'failed_login'])
if port_scans > self.thresholds['port_scan']:
self._trigger_alert(src_ip, 'port_scan')
if failed_logins > self.thresholds['failed_login']:
self._trigger_alert(src_ip, 'failed_login')
def _trigger_alert(self, src_ip, alert_type):
alert = {
'src_ip': src_ip,
'type': alert_type,
'time': time.time()
}
self.alerts.append(alert)
print(f"[ALERT] ΠΠ±Π½Π°ΡΡΠΆΠ΅Π½ΠΎ {alert_type} Ρ {src_ip}")
def get_alerts(self):
return self.alerts
# ΠΡΠΈΠΌΠ΅Ρ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΡ
if __name__ == "__main__":
detector = IntrusionDetector({'port_scan': 50, 'failed_login': 3})
# ΠΠΌΠΈΡΠ°ΡΠΈΡ ΠΏΡΠΈΠ΅ΠΌΠ° Π»ΠΎΠ³ΠΎΠ²
for i in range(60):
detector.ingest_log({
'src_ip': '192.168.1.10',
'dst_port': 22 + i % 5,
'event': 'connection_attempt',
'timestamp': time.time()
})
time.sleep(0.1)
```
**Π‘Ρ
Π΅ΠΌΠ° ΠΏΠΎΡΠΎΠΊΠ° Π΄Π°Π½Π½ΡΡ
(ASCII)**
```
[Π‘Π΅ΡΠ΅Π²ΠΎΠΉ ΡΡΠ°ΡΠΈΠΊ]
|
v
[ΠΠΎΠ»Π»Π΅ΠΊΡΠΎΡ Π»ΠΎΠ³ΠΎΠ²] --> (ΠΏΡΠ΅ΠΎΠ±ΡΠ°Π·ΠΎΠ²Π°Π½ΠΈΠ΅ Π² ΡΠ»ΠΎΠ²Π°ΡΡ) --> [IntrusionDetector]
| |
v v
[ΠΡΡΠ΅Ρ Π»ΠΎΠ³ΠΎΠ²] [ΠΠ±ΡΠ°Π±ΠΎΡΡΠΈΠΊ ΠΎΠΏΠΎΠ²Π΅ΡΠ΅Π½ΠΈΠΉ]
| |
+-----------------+
|
v
[Π₯ΡΠ°Π½ΠΈΠ»ΠΈΡΠ΅/Π²ΡΠ²ΠΎΠ΄ ΠΎΠΏΠΎΠ²Π΅ΡΠ΅Π½ΠΈΠΉ]
```
ΠΠ»ΡΡΠ΅Π²ΡΠ΅ ΠΌΠΎΠΌΠ΅Π½ΡΡ:
- `ΠΡΡΠ΅Ρ Π»ΠΎΠ³ΠΎΠ²` Ρ
ΡΠ°Π½ΠΈΡ Π½Π΅Π΄Π°Π²Π½ΠΈΠ΅ ΡΠΎΠ±ΡΡΠΈΡ Π΄Π»Ρ ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ IP-Π°Π΄ΡΠ΅ΡΠ°.
- `_evaluate()` ΠΏΡΠΈΠΌΠ΅Π½ΡΠ΅Ρ ΡΠ²ΡΠΈΡΡΠΈΠΊΡ Π½Π° Ρ
ΠΎΠ΄Ρ.
- ΠΠΏΠΎΠ²Π΅ΡΠ΅Π½ΠΈΡ Π΄ΠΎΠ±Π°Π²Π»ΡΡΡΡΡ Π² ΠΏΡΠΎΡΡΠΎΠΉ ΡΠΏΠΈΡΠΎΠΊ ΠΈ Π²ΡΠ²ΠΎΠ΄ΡΡΡΡ Π½Π° ΠΏΠ΅ΡΠ°ΡΡ.
Π‘ΠΊΠ°ΠΆΠΈ, ΠΊΠ°ΠΊ ΡΠ΅Π±Π΅ ΠΏΠΎΡΠΎΠ³ΠΈ, ΠΈΠ»ΠΈ Π΄ΡΠΌΠ°Π΅ΡΡ, Π½ΡΠΆΠ΅Π½ Π±ΠΎΠ»Π΅Π΅ ΡΠ»ΠΎΠΆΠ½ΡΠΉ ΠΊΠΎΠ½Π΅ΡΠ½ΡΠΉ Π°Π²ΡΠΎΠΌΠ°Ρ.