-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_server.py
More file actions
146 lines (111 loc) · 3.76 KB
/
api_server.py
File metadata and controls
146 lines (111 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from flask import Flask, jsonify, request
import sqlite3
from datetime import datetime
import json
app = Flask(__name__)
DB_PATH = 'jobs.db'
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@app.route('/jobs', methods=['GET'])
def get_jobs():
"""Retrieve all jobs or filter by status"""
status = request.args.get('status')
conn = get_db_connection()
if status:
jobs = conn.execute('SELECT * FROM jobs WHERE status = ? ORDER BY id', (status,)).fetchall()
else:
jobs = conn.execute('SELECT * FROM jobs ORDER BY id').fetchall()
conn.close()
return jsonify([dict(job) for job in jobs])
@app.route('/jobs/<int:job_id>', methods=['GET'])
def get_job(job_id):
"""Retrieve a specific job by ID"""
conn = get_db_connection()
job = conn.execute('SELECT * FROM jobs WHERE id = ?', (job_id,)).fetchone()
conn.close()
if job is None:
return jsonify({'error': 'Job not found'}), 404
return jsonify(dict(job))
@app.route('/jobs/<int:job_id>/status', methods=['PUT'])
def update_job_status(job_id):
"""
Update job status and optionally store results
Expected JSON body:
{
"status": "processing|completed|failed",
"result": "optional result data"
}
"""
data = request.get_json(silent=True)
if not data:
return jsonify({'error': 'Invalid or missing JSON body'}), 400
status = data.get('status')
result = data.get('result')
valid_statuses = ['pending', 'processing', 'completed', 'failed']
if status not in valid_statuses:
return jsonify({
'error': 'Invalid status',
'valid_statuses': valid_statuses
}), 400
conn = get_db_connection()
existing_job = conn.execute('SELECT * FROM jobs WHERE id = ?', (job_id,)).fetchone()
if existing_job is None:
conn.close()
return jsonify({'error': 'Job not found'}), 404
conn.execute(
'''
UPDATE jobs
SET status = ?, result = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''',
(status, result, job_id)
)
conn.commit()
updated_job = conn.execute('SELECT * FROM jobs WHERE id = ?', (job_id,)).fetchone()
conn.close()
return jsonify(dict(updated_job)), 200
@app.route('/jobs', methods=['POST'])
def create_job():
"""
Create a new job
Expected JSON body:
{
"job_type": "backup|cleanup|report",
"payload": "JSON string with job parameters"
}
"""
data = request.get_json(silent=True)
if not data:
return jsonify({'error': 'Invalid or missing JSON body'}), 400
job_type = data.get('job_type')
payload = data.get('payload')
valid_job_types = ['backup', 'cleanup', 'report']
if job_type not in valid_job_types:
return jsonify({
'error': 'Invalid job_type',
'valid_job_types': valid_job_types
}), 400
if payload is None:
return jsonify({'error': 'payload is required'}), 400
if isinstance(payload, dict):
payload = json.dumps(payload)
elif not isinstance(payload, str):
return jsonify({'error': 'payload must be a JSON string or object'}), 400
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
'''
INSERT INTO jobs (job_type, payload, status, result, created_at, updated_at)
VALUES (?, ?, 'pending', NULL, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
''',
(job_type, payload)
)
conn.commit()
job_id = cursor.lastrowid
created_job = conn.execute('SELECT * FROM jobs WHERE id = ?', (job_id,)).fetchone()
conn.close()
return jsonify(dict(created_job)), 201
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)