-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathipfs_wrapper.py
More file actions
90 lines (78 loc) · 3.19 KB
/
ipfs_wrapper.py
File metadata and controls
90 lines (78 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
import subprocess
import platform
from tkinter import messagebox
from io import BytesIO
class IPFSWrapper:
def __init__(self):
self.ipfs_process = None
self.status = "IPFS Stopped"
def add(self, file_path):
try:
result = subprocess.run(['ipfs', 'add', file_path], capture_output=True, text=True)
if result.returncode != 0:
self.error_reporting(result.stderr)
return None
lines = result.stdout.strip().split('\n')
last_line = lines[-1]
ipfs_hash = last_line.split()[1]
print(f"File {file_path} added to IPFS with hash {ipfs_hash}")
return ipfs_hash
except Exception as e:
self.error_reporting("Add to IPFS failed.", e)
return None
def error_reporting(self, message, e=None):
if e:
message += f": {str(e)}"
print(message)
messagebox.showerror("IPFS Error", message)
def cat_file(self, ipfs_hash):
try:
result = subprocess.run(['ipfs', 'cat', ipfs_hash], capture_output=True)
if result.returncode != 0:
self.error_reporting(result.stderr)
return None
file_bytes = BytesIO(result.stdout)
return file_bytes
except Exception as e:
self.error_reporting("Error: Could not get file from IPFS.", e)
return None
def get_status(self):
return "IPFS Stopped" if self.ipfs_process is None else "IPFS Running"
def pin_add(self, ipfs_hash):
result = subprocess.run(['ipfs', 'pin', 'add', ipfs_hash], capture_output=True, text=True)
if result.returncode != 0:
self.error_reporting(result.stderr)
return None
pinned_hash = ipfs_hash
print(f"File with IPFS hash {ipfs_hash} pinned")
return pinned_hash
def pin_rm(self, ipfs_hash):
result = subprocess.run(['ipfs', 'pin', 'rm', ipfs_hash], capture_output=True, text=True)
if result.returncode != 0:
self.error_reporting(result.stderr)
return None
unpinned_hash = ipfs_hash
print(f"File with IPFS hash {ipfs_hash} unpinned")
return unpinned_hash
def start_daemon(self):
if self.ipfs_process is None:
with open(os.devnull, 'w') as devnull:
if platform.system() == 'Windows':
self.ipfs_process = subprocess.Popen('start /B ipfs daemon > NUL 2>&1', shell=True, stdout=devnull, stderr=devnull)
else:
self.ipfs_process = subprocess.Popen(['ipfs', 'daemon'], stdout=devnull, stderr=devnull, preexec_fn=os.setpgrp)
print("IPFS daemon started")
self.get_status()
def stop_daemon(self):
if self.ipfs_process is not None:
if platform.system() == 'Windows':
subprocess.call('ipfs shutdown', shell=True)
else:
subprocess.call(['ipfs', 'shutdown'])
self.ipfs_process.wait()
self.ipfs_process = None
print("IPFS daemon stopped")
self.get_status()
if __name__ == '__main__':
ipfs_wrapper = IPFSWrapper()