Skip to content

Commit 07bec6c

Browse files
refactored code
1 parent 8f97a99 commit 07bec6c

15 files changed

Lines changed: 77 additions & 58 deletions

File tree

node/mesh_peer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ const PEER_SERVICE_NAME = "python-peer";
66
async function main() {
77
// 1. Initialize Configuration
88
const config: MeshConfig = {
9+
redis: { host: 'localhost', port: 6379 },
910
service_discovery: {
10-
redis_config: { host: 'localhost', port: 6379 },
1111
heartbeat_interval: 2,
1212
heartbeat_threshold: 3
1313
},
14+
pubsub: {},
1415
host: '127.0.0.1',
1516
port: 0
1617
};

node/src/communication/h2-communication.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as http2 from 'http2';
22
import { EventEmitter } from 'events';
33
import { Node, MeshConfig, Message } from '../shared/types';
4-
import { URL } from 'url';
54

65
import { IChannel, ICommunication } from '../abstract';
76

node/src/communication/pubsub.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import Redis from 'ioredis';
2-
import { ServiceDiscoveryConfig } from '../shared/types';
2+
import { RedisConfig } from '../shared/types';
33
import { EventEmitter } from 'events';
44

55
import { ITopic, IPubSub } from '../abstract';
@@ -31,21 +31,21 @@ class RedisTopicImpl implements ITopic {
3131
export class RedisPubSub extends EventEmitter implements IPubSub {
3232
private publisher: Redis;
3333
private subscriber: Redis;
34-
private config: ServiceDiscoveryConfig;
34+
private config: RedisConfig;
3535
private subscriptions: Map<string, Set<(msg: any) => void>> = new Map();
3636

37-
constructor(config: ServiceDiscoveryConfig) {
37+
constructor(config: RedisConfig) {
3838
super();
3939
this.config = config;
4040

41-
if (config.redis_config.url) {
42-
this.publisher = new Redis(config.redis_config.url);
43-
this.subscriber = new Redis(config.redis_config.url);
41+
if (config.url) {
42+
this.publisher = new Redis(config.url);
43+
this.subscriber = new Redis(config.url);
4444
} else {
4545
const redisOptions = {
46-
host: config.redis_config.host || 'localhost',
47-
port: config.redis_config.port || 6379,
48-
...config.redis_config
46+
host: config.host || 'localhost',
47+
port: config.port || 6379,
48+
...config
4949
};
5050
this.publisher = new Redis(redisOptions);
5151
this.subscriber = new Redis(redisOptions);

node/src/demo_client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ import { Mesh, MeshConfig } from './mesh';
22

33
async function main() {
44
const config: MeshConfig = {
5+
redis: { host: 'localhost', port: 6379 },
56
service_discovery: {
6-
redis_config: { host: 'localhost', port: 6379 },
77
heartbeat_interval: 2,
88
heartbeat_threshold: 3
99
},
10+
pubsub: {},
1011
host: '127.0.0.1',
1112
port: 0
1213
};

node/src/discovery/redis-discovery.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import Redis from 'ioredis';
2-
import { Node, ServiceDiscoveryConfig } from '../shared/types';
2+
import { Node, ServiceDiscoveryConfig, RedisConfig } from '../shared/types';
33
import { EventEmitter } from 'events';
44

55
import { IServiceDiscovery } from '../abstract';
@@ -9,16 +9,16 @@ export class RedisServiceDiscovery extends EventEmitter implements IServiceDisco
99
private config: ServiceDiscoveryConfig;
1010
private stopHeartbeat: boolean = false;
1111

12-
constructor(config: ServiceDiscoveryConfig) {
12+
constructor(redisConfig: RedisConfig, config: ServiceDiscoveryConfig) {
1313
super();
1414
this.config = config;
15-
if (config.redis_config.url) {
16-
this.redis = new Redis(config.redis_config.url);
15+
if (redisConfig.url) {
16+
this.redis = new Redis(redisConfig.url);
1717
} else {
1818
this.redis = new Redis({
19-
host: config.redis_config.host || 'localhost',
20-
port: config.redis_config.port || 6379,
21-
...config.redis_config
19+
host: redisConfig.host || 'localhost',
20+
port: redisConfig.port || 6379,
21+
...redisConfig
2222
});
2323
}
2424
}

node/src/mesh.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ export class Mesh implements IMesh {
2828

2929
constructor(config: MeshConfig) {
3030
this.config = config;
31-
this.discovery = new RedisServiceDiscovery(config.service_discovery);
31+
this.discovery = new RedisServiceDiscovery(config.redis, config.service_discovery);
3232
this.communication = new H2Communication(config);
33-
this.pubsub = new RedisPubSub(config.service_discovery);
33+
this.pubsub = new RedisPubSub(config.redis);
3434
this.nodeId = uuidv4();
3535
}
3636

node/src/shared/types.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,26 @@ export interface Message {
1111
}
1212

1313
// Configuration Interfaces
14+
export interface RedisConfig {
15+
url?: string;
16+
host?: string;
17+
port?: number;
18+
[key: string]: any;
19+
}
20+
1421
export interface ServiceDiscoveryConfig {
15-
redis_config: {
16-
url?: string;
17-
host?: string;
18-
port?: number;
19-
[key: string]: any;
20-
};
2122
heartbeat_interval: number; // Seconds
2223
heartbeat_threshold: number;
2324
}
2425

26+
export interface PubSubConfig {
27+
// Specific params for pubsub if needed
28+
}
29+
2530
export interface MeshConfig {
31+
redis: RedisConfig; // Shared Redis Configuration
2632
service_discovery: ServiceDiscoveryConfig;
33+
pubsub?: PubSubConfig;
2734
host: string;
2835
port: number; // 0 for dynamic
2936
debug?: boolean;

node/test-pubsub.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,26 @@ import { Mesh, MeshConfig } from './src/mesh';
33
async function main() {
44
// Config for subscriber
55
const config1: MeshConfig = {
6+
redis: { host: 'localhost', port: 6379 },
67
host: 'localhost',
78
port: 0,
89
service_discovery: {
9-
redis_config: { host: 'localhost', port: 6379 },
1010
heartbeat_interval: 2,
1111
heartbeat_threshold: 3
12-
}
12+
},
13+
pubsub: {}
1314
};
1415

1516
// Config for publisher (must be a separate object to avoid port mutation conflict in tests)
1617
const config2: MeshConfig = {
18+
redis: { host: 'localhost', port: 6379 },
1719
host: 'localhost',
1820
port: 0,
1921
service_discovery: {
20-
redis_config: { host: 'localhost', port: 6379 },
2122
heartbeat_interval: 2,
2223
heartbeat_threshold: 3
23-
}
24+
},
25+
pubsub: {}
2426
};
2527

2628
const subscriber = new Mesh(config1);

python/mesh_peer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
async def main():
1414
# 1. Initialize Configuration
1515
config = MeshConfig(
16+
redis={"host": "localhost", "port": 6379},
1617
service_discovery=ServiceDiscoveryConfig(
17-
redis_config={"host": "localhost", "port": 6379},
1818
heartbeat_interval=2,
1919
heartbeat_threshold=3
2020
),

python/src/mesh_lib/communication/pubsub.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import redis.asyncio as redis
44
from typing import Callable, Optional, Set, Dict, Any
55
from mesh_lib.abstract import ITopic, IPubSub
6-
from mesh_lib.shared.types import ServiceDiscoveryConfig
6+
from typing import Dict, Any
7+
from mesh_lib.shared.types import PubSubConfig
78

89
class RedisTopicImpl(ITopic):
910
def __init__(self, manager: 'RedisPubSub', topic_name: str):
@@ -23,20 +24,17 @@ async def close(self):
2324
await self.manager.unsubscribe(self.topic_name)
2425

2526
class RedisPubSub(IPubSub):
26-
def __init__(self, config: ServiceDiscoveryConfig):
27-
self.config = config
27+
def __init__(self, redis_config: Dict[str, Any]):
28+
self.config = redis_config # Store it if needed, or just use it
2829
self.subscriptions: Dict[str, Set[Callable[[any], None]]] = {}
2930

3031
# Redis setup
31-
# Note: shared/types.py defines ServiceDiscoveryConfig.redis_config as a dict
32-
redis_args = {}
33-
if "url" in config.redis_config:
34-
self.publisher = redis.from_url(config.redis_config["url"])
35-
self.subscriber = redis.from_url(config.redis_config["url"])
32+
if "url" in redis_config:
33+
self.publisher = redis.from_url(redis_config["url"])
34+
self.subscriber = redis.from_url(redis_config["url"])
3635
else:
37-
host = config.redis_config.get("host", "localhost")
38-
port = config.redis_config.get("port", 6379)
39-
# Filter unknown args? reusing dict directly might pass unwanted args but redis-py usually fine.
36+
host = redis_config.get("host", "localhost")
37+
port = redis_config.get("port", 6379)
4038
# safe defaults
4139
self.publisher = redis.Redis(host=host, port=port)
4240
self.subscriber = redis.Redis(host=host, port=port)

0 commit comments

Comments
 (0)