Skip to content

Commit 09ec9f0

Browse files
committed
[VERSION] - This is TokenGate beta, partial is now used instead of lambda for executions, concurrency boosted, types hinted.
1 parent c790ebd commit 09ec9f0

13 files changed

+49
-685
lines changed

DOCS/SETUP.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class OperationsCoordinator: # Under this class are where you find configuration
2424
self,
2525
workers_per_core: int = 4, # Controls number of workers and may notgo under 2 workers.
2626
enable_convergence: bool = True, # Controls dynamic worker counts.
27-
convergence_verbose: bool = False,
27+
convergence_verbose: bool = False, # DO NOT ENABLE INSIDE A REPL!!!
2828
base_memory_budget_mb: int = 45, # This is a control for operation memory (adjustable).
2929
num_executors: int = 8, # This controls number of parallel executors.
3030
auto_block_dangerous: bool = False, # Blocks explosive tasks with possibly dangerous inputs.

allocation_optimizer.py

Lines changed: 1 addition & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -407,108 +407,4 @@ def print_portfolio(self):
407407
print(f" Success Rate: {alloc.get_success_rate():.1f}%")
408408

409409
print()
410-
print("=" * 80)
411-
412-
413-
# ============================================================================
414-
# TESTING
415-
# ============================================================================
416-
417-
if __name__ == '__main__':
418-
from .code_inspector import CodeInspector
419-
420-
print("=" * 70)
421-
print("ALLOCATION OPTIMIZER TEST")
422-
print("=" * 70)
423-
print()
424-
425-
# Create optimizer
426-
optimizer = AllocationOptimizer(base_budget_mb=50)
427-
428-
429-
# Test functions with different complexity
430-
def trivial_task(x):
431-
return x * 2
432-
433-
434-
def moderate_task(data):
435-
result = []
436-
for item in data:
437-
if item > 0:
438-
result.append(item * 2)
439-
return result
440-
441-
442-
def extreme_task(size):
443-
matrix = [[i * j for j in range(size)] for i in range(size)]
444-
return sum(sum(row) for row in matrix)
445-
446-
447-
# Analyze and initialize operations
448-
test_ops = [
449-
('trivial_op', trivial_task),
450-
('moderate_op', moderate_task),
451-
('extreme_op', extreme_task)
452-
]
453-
454-
for op_name, func in test_ops:
455-
print("-" * 70)
456-
print(f"Initializing: {op_name}")
457-
print("-" * 70)
458-
459-
metrics = CodeInspector.analyze(func)
460-
optimizer.initialize_operation(op_name, metrics)
461-
print()
462-
463-
# Simulate optimization cycles
464-
print("=" * 70)
465-
print("SIMULATING OPTIMIZATION CYCLES")
466-
print("=" * 70)
467-
print()
468-
469-
# Trivial operation - should optimize aggressively
470-
print("-" * 70)
471-
print("TRIVIAL OPERATION - Aggressive Optimization (5% rate)")
472-
print("-" * 70)
473-
474-
for cycle in range(5):
475-
print(f"\nCycle {cycle + 1}:")
476-
477-
# Start at baseline and increment
478-
confidence = 85.0 + (cycle * 1.5)
479-
480-
new_alloc = optimizer.attempt_optimization('trivial_op', confidence)
481-
482-
if new_alloc:
483-
# Simulate successful execution with confidence gain
484-
success_confidence = confidence + 1.5
485-
optimizer.record_execution_result('trivial_op', True, success_confidence)
486-
else:
487-
print(" Optimization blocked")
488-
489-
# Extreme operation - should optimize conservatively
490-
print()
491-
print("-" * 70)
492-
print("EXTREME OPERATION - Conservative Optimization (2% rate)")
493-
print("-" * 70)
494-
495-
for cycle in range(5):
496-
print(f"\nCycle {cycle + 1}:")
497-
498-
# Start at baseline and increment
499-
confidence = 83.7 + (cycle * 1.0)
500-
501-
new_alloc = optimizer.attempt_optimization('extreme_op', confidence)
502-
503-
if new_alloc:
504-
# Simulate with smaller confidence gains
505-
success_confidence = confidence + 1.0
506-
optimizer.record_execution_result('extreme_op', True, success_confidence)
507-
else:
508-
print(" Optimization blocked")
509-
510-
# Show final portfolio
511-
optimizer.print_portfolio()
512-
513-
print()
514-
print("Test complete!")
410+
print("=" * 80)

code_inspector.py

Lines changed: 1 addition & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -359,105 +359,4 @@ def predict_initial_allocation(metrics: CodeMetrics, base_budget_mb: int = 50) -
359359
'confidence': metrics.confidence,
360360
'reasoning': reasoning,
361361
'complexity_score': metrics.complexity_score
362-
}
363-
364-
365-
# ============================================================================
366-
# TESTING & EXAMPLES
367-
# ============================================================================
368-
369-
if __name__ == '__main__':
370-
371-
print("=" * 70)
372-
print("CODE INSPECTOR TEST")
373-
print("=" * 70)
374-
print()
375-
376-
377-
# Test functions with varying complexity
378-
379-
def trivial_function(x):
380-
"""Trivial: just return input."""
381-
return x
382-
383-
384-
def simple_function(a, b):
385-
"""Simple: basic arithmetic."""
386-
return a + b * 2
387-
388-
389-
def moderate_function(data):
390-
"""Moderate: some loops and logic."""
391-
result = []
392-
for item in data:
393-
if item > 0:
394-
result.append(item * 2)
395-
return result
396-
397-
398-
def complex_function(size):
399-
"""Complex: nested loops."""
400-
matrix = [[0 for _ in range(size)] for _ in range(size)]
401-
for i in range(size):
402-
for j in range(size):
403-
matrix[i][j] = i * j
404-
return matrix
405-
406-
407-
def extreme_function(n):
408-
"""Extreme: recursive + heavy computation."""
409-
if n <= 1:
410-
return 1
411-
result = extreme_function(n - 1) + extreme_function(n - 2)
412-
data = [i ** 2 for i in range(100)]
413-
return result + sum(data)
414-
415-
416-
# Analyze each function
417-
test_functions = [
418-
trivial_function,
419-
simple_function,
420-
moderate_function,
421-
complex_function,
422-
extreme_function
423-
]
424-
425-
for func in test_functions:
426-
print("-" * 70)
427-
print(f"Analyzing: {func.__name__}")
428-
print("-" * 70)
429-
430-
metrics = CodeInspector.analyze(func)
431-
432-
print(f" Bytecode length: {metrics.bytecode_length}")
433-
print(f" Stack size: {metrics.stack_size}")
434-
print(f" Local vars: {metrics.local_vars}")
435-
print(f" Constants: {metrics.const_count} ({metrics.const_footprint} bytes)")
436-
print(f" External calls: {len(metrics.external_calls)}")
437-
if metrics.external_calls:
438-
print(f" Calls: {', '.join(metrics.external_calls[:5])}")
439-
print(f" Loops: {metrics.loop_count}")
440-
print(f" Branches: {metrics.branch_count}")
441-
print()
442-
print(f" Complexity Score: {metrics.complexity_score:.1f}")
443-
print(f" Complexity Level: {metrics.complexity_level.name}")
444-
print(f" Confidence: {metrics.confidence}%")
445-
print()
446-
447-
# Get allocation prediction
448-
allocation = CodeInspector.predict_initial_allocation(metrics)
449-
print(f" Recommended Allocation:")
450-
print(f" Memory: {allocation['memory_mb']} MB")
451-
print(f" Timeout: {allocation['timeout_seconds']:.0f} seconds")
452-
print(f" Reasoning: {allocation['reasoning']}")
453-
print()
454-
455-
print("=" * 70)
456-
print("INSPECTOR TEST COMPLETE")
457-
print("=" * 70)
458-
print()
459-
print("Next Steps:")
460-
print(" 1. Integrate with overflow_guard.py")
461-
print(" 2. Add token-level tracking")
462-
print(" 3. Implement retry logic with allocation bumping")
463-
print(" 4. Build confidence-weighted optimization (2% drops)")
362+
}

core_affinity_queue.py

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
from queue import Queue
1919
from typing import List
2020

21-
from .token_system import TaskToken
22-
2321

2422
class TaskWeight(Enum):
2523
"""Routing weight classes used by the affinity policy."""
@@ -190,33 +188,6 @@ def __init__(self, topology, workers_per_core: int = 4):
190188
self.routing_failures = 0
191189
self._routing_lock = threading.Lock()
192190

193-
def classify_token_weight(self, token: TaskToken) -> TaskWeight:
194-
"""Infer routing weight from token tags or operation type.
195-
196-
Tag-based weight takes precedence. If no explicit weight tag is present,
197-
the operation type is inspected for heavy/light hints. Medium is the
198-
fallback class.
199-
"""
200-
# Check tags first
201-
if 'weight' in token.metadata.tags:
202-
weight_str = token.metadata.tags['weight'].lower()
203-
if weight_str == 'heavy':
204-
return TaskWeight.HEAVY
205-
elif weight_str == 'light':
206-
return TaskWeight.LIGHT
207-
else:
208-
return TaskWeight.MEDIUM
209-
210-
# Check operation_type suffix
211-
op_type = token.metadata.operation_type.lower()
212-
if op_type.endswith('_heavy') or 'heavy' in op_type:
213-
return TaskWeight.HEAVY
214-
elif op_type.endswith('_light') or 'light' in op_type:
215-
return TaskWeight.LIGHT
216-
217-
# Default to medium
218-
return TaskWeight.MEDIUM
219-
220191
def get_valid_cores_for_weight(self, weight: TaskWeight) -> List[int]:
221192
"""Return the allowed core chain for the given weight."""
222193
return self.policy.get_preference_chain(weight)

core_pinned_staggered_queue.py

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,11 @@
1515
"""
1616

1717
import asyncio
18-
import os
19-
import sys
2018
import time
19+
from functools import partial
2120
from typing import Dict, List, Tuple, Any
2221

2322
from .threading_metrics import get_metrics
24-
25-
# Add project paths for imports
26-
current_dir = os.path.dirname(os.path.abspath(__file__))
27-
if current_dir not in sys.path:
28-
sys.path.insert(0, current_dir)
29-
parent_dir = os.path.dirname(current_dir)
30-
if parent_dir not in sys.path:
31-
sys.path.insert(0, parent_dir)
32-
3323
from .token_system import TaskToken, TokenState
3424
from .admission_gate import WorkerTaskQueue
3525
from .core_affinity_queue import TaskWeight
@@ -58,6 +48,7 @@ def __init__(self, num_cores: int, workers_per_core: int, coordinator: Any):
5848
retry, overflow, and Guard House callbacks.
5949
"""
6050
super().__init__()
51+
self.result_verbose = False
6152
self.coordinator = coordinator
6253
self.num_cores = num_cores
6354
self.workers_per_core = workers_per_core
@@ -79,7 +70,7 @@ def __init__(self, num_cores: int, workers_per_core: int, coordinator: Any):
7970
self.core_busy: Dict[int, int] = {c: 0 for c in range(1, self.num_cores + 1)}
8071

8172
# Capped mailbox length to prevent runaway memory (DOS safety)
82-
self.MAILBOX_MAX = 100 # Max tokens per worker mailbox
73+
self.MAILBOX_MAX = 75 # Max tokens per worker mailbox
8374

8475
self.core_patterns: Dict[int, int] = {}
8576
for core_id in range(1, num_cores + 1):
@@ -176,12 +167,9 @@ async def _execute_token(self, token: TaskToken, worker_id: str, core_id: int):
176167
success = False
177168

178169
try:
179-
# Execute function
180-
result = await asyncio.get_event_loop().run_in_executor(
181-
None,
182-
lambda: token.func(*token.args, **token.kwargs))
183-
184-
# Success!
170+
loop = asyncio.get_running_loop()
171+
bound_func = partial(token.func, *token.args, **token.kwargs)
172+
result = await loop.run_in_executor(None, bound_func)
185173
token.set_result(result)
186174
self.total_executed += 1
187175
success = True
@@ -192,8 +180,9 @@ async def _execute_token(self, token: TaskToken, worker_id: str, core_id: int):
192180
# Failed!
193181
token.set_error(e)
194182
self.total_failed += 1
183+
if self.result_verbose:
184+
print(f"[{worker_id.upper()}] ✗ Failed {token.token_id}: {e}")
195185

196-
print(f"[{worker_id.upper()}] ✗ Failed {token.token_id}: {e}")
197186
finally:
198187
execution_duration = time.time() - start_time
199188

@@ -276,13 +265,6 @@ async def _execute_token_with_metrics(self, token: "TaskToken", worker_id: str,
276265
"""Execute one token while updating worker-state and outcome metrics."""
277266
op_type = token.metadata.tags.get("operation_type", "unknown")
278267

279-
# Busy/idle update (pattern aware)
280-
active_workers = self.core_patterns.get(core_id, self.workers_per_core)
281-
self.core_busy[core_id] = min(active_workers, self.core_busy.get(core_id, 0) + 1)
282-
busy = self.core_busy[core_id]
283-
idle = max(0, active_workers - busy)
284-
self.metrics.update_worker_state(core_id, busy, idle)
285-
286268
t0 = time.perf_counter()
287269
try:
288270
await self._execute_token(token, worker_id, core_id)
@@ -291,12 +273,6 @@ async def _execute_token_with_metrics(self, token: "TaskToken", worker_id: str,
291273
except Exception:
292274
self.metrics.record_task_failure(op_type, core_id)
293275
raise
294-
finally:
295-
# Decrement busy and update again
296-
self.core_busy[core_id] = max(0, self.core_busy.get(core_id, 0) - 1)
297-
busy = self.core_busy[core_id]
298-
idle = max(0, active_workers - busy)
299-
self.metrics.update_worker_state(core_id, busy, idle)
300276

301277
def get_core_for_weight(self, weight: TaskWeight) -> List[int]:
302278
"""Return the eligible core range for a routing weight.
@@ -327,7 +303,8 @@ def get_core_for_weight(self, weight: TaskWeight) -> List[int]:
327303
# Fallback for single-core
328304
return [1]
329305

330-
def classify_token_weight(self, token: TaskToken) -> TaskWeight:
306+
@staticmethod
307+
def classify_token_weight(token: TaskToken) -> TaskWeight:
331308
"""Infer routing weight from token tags or operation-type naming.
332309
333310
Explicit weight tags take precedence over operation-type heuristics.
@@ -382,7 +359,7 @@ def assign_worker_positions(self, worker_id: str, worker_index: int, core_id: in
382359

383360
print(f"[CORE_PINNED] Worker {worker_id} (Core {core_id}): {positions[:5]}... (every {self.total_workers})")
384361

385-
def assign_position_for_token(self, token: TaskToken, verbose = True) -> int:
362+
def assign_position_for_token(self, token: TaskToken) -> int:
386363
"""Assign a staggered global route position for a token.
387364
388365
The assigned position respects token weight, valid-core range, current
@@ -412,7 +389,7 @@ def assign_position_for_token(self, token: TaskToken, verbose = True) -> int:
412389
# Increment counter
413390
self.core_position_counters[chosen_core] += 1
414391

415-
if verbose:
392+
if self.result_verbose:
416393
print(f"[ROUTING] Token {token.token_id} ({weight.value}) → Pos {position} (Core {chosen_core}, Pattern {active_workers})")
417394
return position
418395

0 commit comments

Comments
 (0)