-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathModbusPollCommand.php
More file actions
81 lines (67 loc) · 2.9 KB
/
Copy pathModbusPollCommand.php
File metadata and controls
81 lines (67 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?php
namespace App\Console\Commands;
use App\Models\MachineConnection;
use App\Services\Machine\MachineSignalIngestor;
use App\Services\Machine\Modbus\ModbusReader;
use Illuminate\Console\Command;
/**
* Long-running Modbus TCP poller. One process per machine_connection: connects,
* then every poll_interval_ms reads all active tags and feeds each into the
* MachineSignalIngestor. Reconnects with backoff on transport errors.
*
* php artisan modbus:poll --connection=3
*/
class ModbusPollCommand extends Command
{
protected $signature = 'modbus:poll {--connection= : machine_connection id} {--once : single poll cycle then exit (for testing)}';
protected $description = 'Poll a Modbus TCP device and ingest machine signals';
public function handle(MachineSignalIngestor $ingestor, \App\Services\Machine\RuntimeMonitor $runtime): int
{
$connection = MachineConnection::with(['modbusConnection', 'activeTags.workstation'])
->find($this->option('connection'));
if (! $connection || ! $connection->modbusConnection) {
$this->error('Modbus connection not found.');
return self::FAILURE;
}
$modbus = $connection->modbusConnection;
$tags = $connection->activeTags;
$intervalUs = max(100, $modbus->poll_interval_ms) * 1000;
$once = (bool) $this->option('once');
$this->info("Polling {$connection->name} ({$modbus->host}:{$modbus->port}), {$tags->count()} tags");
do {
$reader = new ModbusReader($modbus);
try {
$reader->connect();
$connection->markConnected();
do {
$cycleStart = microtime(true);
$runtime->heartbeat($connection->protocol, $connection->id);
foreach ($tags as $tag) {
try {
$value = $reader->readTag($tag);
$ingestor->ingest($tag, $value);
$connection->increment('messages_received');
} catch (\Throwable $e) {
$this->warn("tag {$tag->name}: {$e->getMessage()}");
}
}
if ($once) {
break 2;
}
$elapsed = (int) ((microtime(true) - $cycleStart) * 1_000_000);
usleep(max(0, $intervalUs - $elapsed));
} while (true);
} catch (\Throwable $e) {
$connection->markError($e->getMessage());
$this->error("connection error: {$e->getMessage()}");
if ($once) {
return self::FAILURE;
}
sleep(5); // backoff before reconnect
} finally {
$reader->close();
}
} while (! $once);
return self::SUCCESS;
}
}