Skip to content

Commit ac72c4e

Browse files
committed
FUSE: initial commit
1 parent cd0b44e commit ac72c4e

File tree

3 files changed

+344
-1
lines changed

3 files changed

+344
-1
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,6 @@ dask-worker-space
8080

8181
## Test configuration
8282
singlestoredb/mysql/tests/databases.json
83+
84+
## Env vars
85+
.env

s4mount.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
'''
4+
hello.py - Example file system for pyfuse3.
5+
6+
This program presents a static file system containing a single file.
7+
8+
Copyright © 2015 Nikolaus Rath <Nikolaus.org>
9+
Copyright © 2015 Gerion Entrup.
10+
11+
Permission is hereby granted, free of charge, to any person obtaining a copy of
12+
this software and associated documentation files (the "Software"), to deal in
13+
the Software without restriction, including without limitation the rights to
14+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
15+
the Software, and to permit persons to whom the Software is furnished to do so.
16+
17+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
19+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
20+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
21+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23+
'''
24+
25+
import os
26+
import sys
27+
28+
from argparse import ArgumentParser
29+
import logging
30+
import singlestoredb as s2
31+
32+
33+
try:
34+
import faulthandler
35+
except ImportError:
36+
pass
37+
else:
38+
faulthandler.enable()
39+
40+
log = logging.getLogger(__name__)
41+
42+
# def init_logging(debug=False):
43+
# formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(threadName)s: '
44+
# '[%(name)s] %(message)s', datefmt="%Y-%m-%d %H:%M:%S")
45+
# handler = logging.StreamHandler()
46+
# handler.setFormatter(formatter)
47+
# root_logger = logging.getLogger()
48+
# if debug:
49+
# handler.setLevel(logging.DEBUG)
50+
# root_logger.setLevel(logging.DEBUG)
51+
# else:
52+
# handler.setLevel(logging.INFO)
53+
# root_logger.setLevel(logging.INFO)
54+
# root_logger.addHandler(handler)
55+
56+
def daemonize() -> None:
57+
devnull = os.open(os.devnull, os.O_RDWR)
58+
os.dup2(devnull, sys.stdin.fileno())
59+
os.dup2(devnull, sys.stdout.fileno())
60+
os.dup2(devnull, sys.stderr.fileno())
61+
62+
def parse_args():
63+
'''Parse command line'''
64+
65+
parser = ArgumentParser()
66+
67+
parser.add_argument('mountpoint', type=str,
68+
help='Where to mount the file system')
69+
return parser.parse_args()
70+
71+
def mountStage(access_token, base_url, workspaceGroupID, mountpoint):
72+
os.makedirs(f"{mountpoint}/stage/{workspaceGroupID}", exist_ok=True)
73+
workspaceManager = s2.manage_workspaces(access_token, base_url=base_url)
74+
wg = workspaceManager.get_workspace_group(workspaceGroupID)
75+
wg.stage.mount(f"{mountpoint}/stage/{workspaceGroupID}")
76+
77+
def main():
78+
options = parse_args()
79+
80+
access_token = os.getenv("API_KEY")
81+
base_url = os.getenv("API_BASEURL")
82+
if not access_token:
83+
print("API_KEY not set")
84+
sys.exit(1)
85+
if not base_url:
86+
print("API_BASEURL not set")
87+
sys.exit(1)
88+
89+
fileManager = s2.manage_files(access_token, base_url=base_url)
90+
91+
# Mount personal notebooks
92+
os.makedirs(f"{options.mountpoint}/personal", exist_ok=True)
93+
print("Mounting personal")
94+
if os.fork() == 0:
95+
fileManager.personal_space.mount(f"{options.mountpoint}/personal")
96+
os._exit(0)
97+
98+
# Mount shared notebooks
99+
os.makedirs(f"{options.mountpoint}/shared", exist_ok=True)
100+
print("Mounting shared")
101+
if os.fork() == 0:
102+
fileManager.shared_space.mount(f"{options.mountpoint}/shared")
103+
os._exit(0)
104+
105+
# Mount stage for each workspace group
106+
workspaceManager = s2.manage_workspaces(access_token, base_url=base_url)
107+
for workspaceGroupID in [wg.id for wg in workspaceManager.workspace_groups]:
108+
os.makedirs(f"{options.mountpoint}/stage/{workspaceGroupID}", exist_ok=True)
109+
print(f"Mounting stage/{workspaceGroupID}")
110+
if os.fork() == 0:
111+
wg = s2.manage_workspaces(access_token, base_url=base_url).get_workspace_group(workspaceGroupID)
112+
wg.stage.mount(f"{options.mountpoint}/stage/{wg.id}")
113+
os._exit(0)
114+
115+
if __name__ == '__main__':
116+
main()
117+

singlestoredb/management/files.py

Lines changed: 224 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
from __future__ import annotations
44

55
import datetime
6+
import errno
67
import glob
78
import io
89
import os
910
import re
1011
from abc import ABC
1112
from abc import abstractmethod
13+
import stat
14+
import sys
15+
import time
1216
from typing import Any
1317
from typing import BinaryIO
1418
from typing import Dict
@@ -17,6 +21,9 @@
1721
from typing import TextIO
1822
from typing import Union
1923

24+
import pyfuse3
25+
import trio
26+
2027
from .. import config
2128
from ..exceptions import ManagementError
2229
from .manager import Manager
@@ -311,7 +318,7 @@ def getctime(self) -> float:
311318
if self.created_at is None:
312319
return 0.0
313320
return self.created_at.timestamp()
314-
321+
315322

316323
class FilesObjectTextWriter(io.StringIO):
317324
"""StringIO wrapper for writing to FileLocation."""
@@ -349,6 +356,208 @@ class FilesObjectBytesReader(io.BytesIO):
349356
"""BytesIO wrapper for reading from FileLocation."""
350357

351358

359+
class Inode:
360+
def __init__(self, fs, parent, name, id=None):
361+
self.fs = fs
362+
363+
if id is None:
364+
id = fs.next_id
365+
fs.next_id += 1
366+
367+
self.name = name
368+
self.parent = parent
369+
self.id = id
370+
self._children = None
371+
372+
def __repr__(self):
373+
return f"Inode({self.id}, \"{self.name}\", {self._children})"
374+
375+
def getNameWithoutTrailingSlash(self):
376+
if self.name == "":
377+
return self.name
378+
if self.isDir():
379+
return self.name[:-1]
380+
return self.name
381+
382+
def getStagePath(self):
383+
if self.parent is None:
384+
return self.name
385+
return self.parent.getStagePath() + self.name
386+
387+
def isDir(self):
388+
return self.name == "" or self.name.endswith("/")
389+
390+
def isFile(self):
391+
return not self.isDir()
392+
393+
def children(self):
394+
if self._children is None:
395+
self._children = []
396+
childrenNames = self.fs.stage.listdir(self.getStagePath())
397+
for childName in childrenNames:
398+
childInode = Inode(self.fs, self, childName)
399+
self.fs.inodes[childInode.id] = childInode
400+
self._children.append(childInode.id)
401+
402+
return self._children
403+
404+
class SinglestoreFS(pyfuse3.Operations):
405+
def __init__(self, fileLocation: FileLocation):
406+
super(SinglestoreFS, self).__init__()
407+
self.next_id = pyfuse3.ROOT_INODE + 1
408+
409+
"""
410+
How to use:
411+
workspaceManager = s2.manage_workspaces(access_token, base_url=base_url)
412+
workspaceGroup = workspaceManager.get_workspace_group(workspace_group)
413+
return workspaceGroup.stage
414+
"""
415+
416+
self.stage = fileLocation
417+
418+
self.inodes = {
419+
pyfuse3.ROOT_INODE: Inode(self, None, "", pyfuse3.ROOT_INODE),
420+
}
421+
422+
async def getattr(self, id, ctx=None):
423+
inode = self.inodes[id]
424+
info = self.stage.info(inode.getStagePath())
425+
426+
entry = pyfuse3.EntryAttributes()
427+
if inode.isDir():
428+
entry.st_mode = (stat.S_IFDIR | 0o555)
429+
entry.st_size = 0
430+
elif inode.isFile():
431+
entry.st_mode = (stat.S_IFREG | 0o555)
432+
entry.st_size = info.size
433+
else:
434+
raise pyfuse3.FUSEError(errno.ENOENT)
435+
if info.writable:
436+
entry.st_mode |= 0o222
437+
438+
entry.st_atime_ns = time.time_ns() # Current timestamp
439+
entry.st_ctime_ns = 0
440+
if info.created_at is not None:
441+
entry.st_ctime_ns = info.created_at.timestamp()*1e9
442+
entry.st_mtime_ns = 0
443+
if info.last_modified_at is not None:
444+
entry.st_mtime_ns = info.last_modified_at.timestamp()*1e9
445+
entry.st_gid = os.getgid() # TODO: check
446+
entry.st_uid = os.getuid() # TODO: check
447+
entry.st_ino = id
448+
449+
return entry
450+
451+
async def lookup(self, parent_id, name, ctx=None):
452+
parent_inode = self.inodes[parent_id]
453+
454+
assert parent_inode.isDir()
455+
456+
for child_id in parent_inode.children():
457+
child_inode = self.inodes[child_id]
458+
if child_inode.getNameWithoutTrailingSlash() == name.decode():
459+
return await self.getattr(child_id)
460+
raise pyfuse3.FUSEError(errno.ENOENT)
461+
462+
async def opendir(self, id, ctx):
463+
if not id in self.inodes:
464+
raise pyfuse3.FUSEError(errno.ENOENT)
465+
return id
466+
467+
async def readdir(self, fh, start_id, token):
468+
if not fh in self.inodes:
469+
raise pyfuse3.FUSEError(errno.ENOENT)
470+
471+
inode = self.inodes[fh]
472+
473+
assert inode.isDir()
474+
475+
children = {child: self.inodes[child] for child in inode.children() if child > start_id}
476+
477+
for child in children.values():
478+
pyfuse3.readdir_reply(
479+
token,
480+
child.getNameWithoutTrailingSlash().encode(),
481+
await self.getattr(child.id),
482+
child.id
483+
)
484+
return
485+
486+
async def open(self, id, flags, ctx):
487+
return pyfuse3.FileInfo(fh=id)
488+
489+
async def read(self, fh, off, size):
490+
inode = self.inodes[fh]
491+
assert inode.isFile()
492+
fileContent = self.stage.download_file(inode.getStagePath())
493+
return fileContent[off:off+size]
494+
495+
async def create(self, parent_id, name, mode, flags, ctx):
496+
parent_inode = self.inodes[parent_id]
497+
assert parent_inode.isDir()
498+
stagePath = parent_inode.getStagePath() + name.decode()
499+
self.stage.open(stagePath, "w").close()
500+
inode = Inode(self, parent_inode, name.decode())
501+
self.inodes[inode.id] = inode
502+
self.inodes[parent_id]._children.append(inode.id)
503+
return pyfuse3.FileInfo(fh=inode.id), await self.getattr(inode.id)
504+
505+
async def setattr(self, id, attr, fields, fh, ctx):
506+
return await self.getattr(id)
507+
508+
async def write(self, fh, offset, data):
509+
inode = self.inodes[fh]
510+
assert inode.isFile()
511+
fileContent = self.stage.download_file(inode.getStagePath())
512+
newFileContent = fileContent[:offset] + data + fileContent[offset+len(data):]
513+
with self.stage.open(inode.getStagePath(), "wb") as f:
514+
return f.write(newFileContent)
515+
516+
async def unlink(self, parent_id, name, ctx):
517+
parent_inode = self.inodes[parent_id]
518+
attr = await self.lookup(parent_id, name)
519+
inode = self.inodes[attr.st_ino]
520+
assert inode.isFile()
521+
self.stage.remove(inode.getStagePath())
522+
parent_inode._children.remove(inode.id)
523+
del self.inodes[inode.id]
524+
return
525+
526+
async def mkdir(self, parent_id, name, mode, ctx):
527+
parent_inode = self.inodes[parent_id]
528+
assert parent_inode.isDir()
529+
stagePath = parent_inode.getStagePath() + name.decode() + "/"
530+
self.stage.mkdir(stagePath)
531+
inode = Inode(self, parent_inode, name.decode() + "/")
532+
self.inodes[inode.id] = inode
533+
parent_inode._children.append(inode.id)
534+
return await self.getattr(inode.id)
535+
536+
async def rename(self, parent_id, name, newparent_id, newname, ctx):
537+
parent_inode = self.inodes[parent_id]
538+
newparent_inode = self.inodes[newparent_id]
539+
assert parent_inode.isDir()
540+
assert newparent_inode.isDir()
541+
attr = await self.lookup(parent_id, name)
542+
inode = self.inodes[attr.st_ino]
543+
self.stage.rename(inode.getStagePath(), newparent_inode.getStagePath() + newname.decode())
544+
inode.parent = newparent_inode
545+
inode.name = newname.decode()
546+
newparent_inode._children.append(inode.id)
547+
parent_inode._children.remove(inode.id)
548+
return
549+
550+
async def rmdir(self, parent_id, name, ctx):
551+
parent_inode = self.inodes[parent_id]
552+
assert parent_inode.isDir()
553+
attr = await self.lookup(parent_id, name)
554+
inode = self.inodes[attr.st_ino]
555+
assert inode.isDir()
556+
self.stage.rmdir(inode.getStagePath())
557+
parent_inode._children.remove(inode.id)
558+
del self.inodes[inode.id]
559+
return
560+
352561
class FileLocation(ABC):
353562
@abstractmethod
354563
def open(
@@ -472,6 +681,20 @@ def __str__(self) -> str:
472681
def __repr__(self) -> str:
473682
pass
474683

684+
def mount(self, mountpoint) -> None:
685+
"""Mount to folder"""
686+
fs = SinglestoreFS(self)
687+
fuse_options = set(pyfuse3.default_options)
688+
# fuse_options.add('fsname=singlestore_fs')
689+
# fuse_options.add('debug')
690+
pyfuse3.init(fs, mountpoint, fuse_options)
691+
692+
try:
693+
trio.run(pyfuse3.main)
694+
except:
695+
pyfuse3.close(unmount=True)
696+
697+
# pyfuse3.close()
475698

476699
class FilesManager(Manager):
477700
"""

0 commit comments

Comments
 (0)