docbroker class + tests + more filebroker test
This commit is contained in:
parent
c1bf80fe68
commit
15857b5ecc
87
papers/content.py
Normal file
87
papers/content.py
Normal file
@ -0,0 +1,87 @@
|
||||
import os
|
||||
|
||||
# files i/o
|
||||
|
||||
def check_file(path, fail=True):
|
||||
if fail:
|
||||
if not os.path.exists(path):
|
||||
raise IOError("File does not exist: {}.".format(path))
|
||||
if not os.path.isfile(path):
|
||||
raise IOError("{} is not a file.".format(path))
|
||||
return True
|
||||
else:
|
||||
return os.path.exists(path) and os.path.isfile(path)
|
||||
|
||||
def check_directory(path, fail=True):
|
||||
if fail:
|
||||
if not os.path.exists(path):
|
||||
raise IOError("File does not exist: {}.".format(path))
|
||||
if not os.path.isdir(path):
|
||||
raise IOError("{} is not a directory.".format(path))
|
||||
return True
|
||||
else:
|
||||
return os.path.exists(path) and os.path.isdir(path)
|
||||
|
||||
def read_file(filepath):
|
||||
check_file(filepath)
|
||||
with open(filepath, 'r') as f:
|
||||
s = f.read()
|
||||
return s
|
||||
|
||||
def write_file(filepath, data):
|
||||
check_directory(os.path.dirname(filepath))
|
||||
with open(filepath, 'w') as f:
|
||||
f.write(data)
|
||||
|
||||
|
||||
# dealing with formatless content
|
||||
|
||||
def get_content(self, path):
|
||||
"""Will be useful when we need to get content from url"""
|
||||
return read_file(path)
|
||||
|
||||
def move_content(self, source, target, overwrite = False):
|
||||
if source == target:
|
||||
return
|
||||
if not overwrite and os.path.exists(target):
|
||||
raise IOError('target file exists')
|
||||
shutil.move(source, target)
|
||||
|
||||
def copy_content(self, source, target, overwrite = False):
|
||||
if source == target:
|
||||
return
|
||||
if not overwrite and os.path.exists(target):
|
||||
raise IOError('target file exists')
|
||||
shutil.copy(source, target)
|
||||
|
||||
|
||||
# editor input
|
||||
|
||||
def editor_input(editor, initial="", suffix=None):
|
||||
"""Use an editor to get input"""
|
||||
if suffix is None:
|
||||
suffix = '.tmp'
|
||||
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as temp_file:
|
||||
tfile_name = temp_file.name
|
||||
temp_file.write(initial)
|
||||
temp_file.flush()
|
||||
cmd = editor.split() # this enable editor command with option, e.g. gvim -f
|
||||
cmd.append(tfile_name)
|
||||
subprocess.call(cmd)
|
||||
with open(tfile_name) as temp_file:
|
||||
content = temp_file.read()
|
||||
os.remove(tfile_name)
|
||||
return content
|
||||
|
||||
def edit_file(editor, path_to_file, temporary=True):
|
||||
if temporary:
|
||||
check_file(path_to_file, fail=True)
|
||||
with open(path_to_file) as f:
|
||||
content = f.read()
|
||||
content = editor_input(editor, content)
|
||||
with open(path_to_file, 'w') as f:
|
||||
f.write(content)
|
||||
else:
|
||||
cmd = editor.split() # this enable editor command with option, e.g. gvim -f
|
||||
cmd.append(path_to_file)
|
||||
subprocess.call(cmd)
|
@ -1,36 +1,15 @@
|
||||
import os
|
||||
import urlparse
|
||||
|
||||
def check_file(path, fail=True):
|
||||
if fail:
|
||||
if not os.path.exists(path):
|
||||
raise IOError("File does not exist: {}.".format(path))
|
||||
if not os.path.isfile(path):
|
||||
raise IOError("{} is not a file.".format(path))
|
||||
return True
|
||||
else:
|
||||
return os.path.exists(path) and os.path.isfile(path)
|
||||
|
||||
def check_directory(path, fail=True):
|
||||
if fail:
|
||||
if not os.path.exists(path):
|
||||
raise IOError("File does not exist: {}.".format(path))
|
||||
if not os.path.isdir(path):
|
||||
raise IOError("{} is not a directory.".format(path))
|
||||
return True
|
||||
else:
|
||||
return os.path.exists(path) and os.path.isdir(path)
|
||||
|
||||
def read_file(filepath):
|
||||
check_file(filepath)
|
||||
with open(filepath, 'r') as f:
|
||||
s = f.read()
|
||||
return s
|
||||
|
||||
def write_file(filepath, data):
|
||||
check_directory(os.path.dirname(filepath))
|
||||
with open(filepath, 'w') as f:
|
||||
f.write(data)
|
||||
from .content import check_file, check_directory, read_file, write_file
|
||||
|
||||
def filter_filename(filename, ext):
|
||||
""" Return the filename without the extension if the extension matches ext.
|
||||
Otherwise return None
|
||||
"""
|
||||
pattern ='.*\{}$'.format(ext)
|
||||
if re.match(pattern, filename) is not None:
|
||||
return filename[:-len(ext)]
|
||||
|
||||
class FileBroker(object):
|
||||
""" Handles all access to meta and bib files of the repository.
|
||||
@ -66,33 +45,120 @@ class FileBroker(object):
|
||||
return read_file(filepath)
|
||||
|
||||
def push_metafile(self, citekey, metadata):
|
||||
"""Put content to disk. Will gladly override anything standing in its way."""
|
||||
filepath = os.path.join(self.metadir, citekey + '.yaml')
|
||||
write_file(filepath, metadata)
|
||||
|
||||
def push_bibfile(self, citekey, bibdata):
|
||||
"""Put content to disk. Will gladly override anything standing in its way."""
|
||||
filepath = os.path.join(self.bibdir, citekey + '.bibyaml')
|
||||
write_file(filepath, bibdata)
|
||||
|
||||
def push(self, citekey, metadata, bibdata):
|
||||
"""Put content to disk. Will gladly override anything standing in its way."""
|
||||
self.push_metafile(citekey, metadata)
|
||||
self.push_bibfile(citekey, bibdata)
|
||||
|
||||
def remove(self, citekey):
|
||||
metafilepath = os.path.join(self.metadir, citekey + '.yaml')
|
||||
os.remove(metafilepath)
|
||||
if check_file(metafilepath):
|
||||
os.remove(metafilepath)
|
||||
bibfilepath = os.path.join(self.bibdir, citekey + '.bibyaml')
|
||||
os.remove(bibfilepath)
|
||||
if check_file(bibfilepath):
|
||||
os.remove(bibfilepath)
|
||||
|
||||
def listing(self, filestats = True):
|
||||
def exists(self, citekey, both=True):
|
||||
if both:
|
||||
return (check_file(os.path.join(self.metadir, citekey + '.yaml'), fail=False) and
|
||||
check_file(os.path.join(self.bibdir, citekey + '.bibyaml'), fail=False))
|
||||
else:
|
||||
return (check_file(os.path.join(self.metadir, citekey + '.yaml'), fail=False) or
|
||||
check_file(os.path.join(self.bibdir, citekey + '.bibyaml'), fail=False))
|
||||
|
||||
|
||||
def listing(self, filestats=True):
|
||||
metafiles = []
|
||||
for filename in os.listdir(self.metadir):
|
||||
stats = os.stat(os.path.join(path, f))
|
||||
metafiles.append(filename, stats)
|
||||
citekey = filter_filename(filename, '.yaml')
|
||||
if citekey is not None:
|
||||
if filestats:
|
||||
stats = os.stat(os.path.join(path, filename))
|
||||
metafiles.append(citekey, stats)
|
||||
else:
|
||||
metafiles.append(citekey)
|
||||
|
||||
bibfiles = []
|
||||
for filename in os.listdir(self.bibdir):
|
||||
stats = os.stat(os.path.join(path, f))
|
||||
bibfiles.append(filename, stats)
|
||||
citekey = filter_filename(filename, '.bibyaml')
|
||||
if citekey is not None:
|
||||
if filestats:
|
||||
stats = os.stat(os.path.join(path, filename))
|
||||
bibfiles.append(citekey, stats)
|
||||
else:
|
||||
bibfiles.append(citekey)
|
||||
|
||||
return {'metafiles': metafiles, 'bibfiles': bibfiles}
|
||||
|
||||
|
||||
class DocBroker(object):
|
||||
""" DocBroker manages the document files optionally attached to the papers.
|
||||
|
||||
* only one document can be attached to a paper (might change in the future)
|
||||
* this document can be anything, the content is never processed.
|
||||
* these document have an adress of the type "pubsdir://doc/citekey.pdf"
|
||||
* document outside of the repository will not be removed.
|
||||
* deliberately, there is no move_doc method.
|
||||
"""
|
||||
|
||||
def __init__(self, directory):
|
||||
self.docdir = os.path.join(directory, 'doc')
|
||||
if not check_directory(self.docdir, fail = False):
|
||||
os.mkdir(self.docdir)
|
||||
|
||||
def is_pubsdir_doc(self, docpath):
|
||||
parsed = urlparse.urlparse(docpath)
|
||||
if parsed.scheme == 'pubsdir':
|
||||
assert parsed.netloc == 'doc'
|
||||
assert parsed.path[0] == '/'
|
||||
return parsed.scheme == 'pubsdir'
|
||||
|
||||
def copy_doc(self, citekey, source_path, overwrite=False):
|
||||
""" Copy a document to the pubsdir/doc, and return the location
|
||||
|
||||
The document will be named {citekey}.{ext}.
|
||||
The location will be pubsdir://doc/{citekey}.{ext}.
|
||||
:param overwrite: will overwrite existing file.
|
||||
:return: the above location
|
||||
"""
|
||||
full_source_path = self.real_docpath(source_path)
|
||||
check_file(full_source_path)
|
||||
|
||||
target_path = 'pubsdir://' + os.path.join('doc', citekey + os.path.splitext(source_path)[-1])
|
||||
full_target_path = self.real_docpath(target_path)
|
||||
if not overwrite and check_file(full_target_path, fail=False):
|
||||
raise IOError('{} file exists.'.format(full_target_path))
|
||||
shutil.copy(full_source_path, full_target_path)
|
||||
|
||||
return target_path
|
||||
|
||||
def remove_doc(self, docpath):
|
||||
""" Will remove only file hosted in pubsdir://doc/
|
||||
|
||||
:raise ValueError: for other paths.
|
||||
"""
|
||||
if not self.is_pubsdir_doc(docpath):
|
||||
raise ValueError(('the file to be removed {} is set as external. '
|
||||
'you should remove it manually.').format(docpath))
|
||||
filepath = self.real_docpath(docpath)
|
||||
if check_file(filepath):
|
||||
os.remove(filepath)
|
||||
|
||||
def real_docpath(self, docpath):
|
||||
"""Return the full path
|
||||
Essentially transform pubsdir://doc/{citekey}.{ext} to /path/to/pubsdir/doc/{citekey}.{ext}.
|
||||
Return absoluted paths of regular ones otherwise.
|
||||
"""
|
||||
if self.is_pubsdir_doc(docpath):
|
||||
parsed = urlparse.urlparse(docpath)
|
||||
docpath = os.path.join(self.docdir, parsed.path[1:])
|
||||
return os.path.normpath(os.path.abspath(docpath))
|
||||
|
@ -5,19 +5,19 @@ import os
|
||||
import testenv
|
||||
import fake_env
|
||||
|
||||
from papers import filebroker
|
||||
from papers import content, filebroker
|
||||
|
||||
class TestFakeFs(unittest.TestCase):
|
||||
"""Abstract TestCase intializing the fake filesystem."""
|
||||
|
||||
def setUp(self):
|
||||
self.fs = fake_env.create_fake_fs([filebroker])
|
||||
self.fs = fake_env.create_fake_fs([content, filebroker])
|
||||
|
||||
def tearDown(self):
|
||||
fake_env.unset_fake_fs([filebroker])
|
||||
fake_env.unset_fake_fs([content, filebroker])
|
||||
|
||||
|
||||
class TestEnDecode(TestFakeFs):
|
||||
class TestFileBroker(TestFakeFs):
|
||||
|
||||
def test_pushpull1(self):
|
||||
|
||||
@ -35,7 +35,7 @@ class TestEnDecode(TestFakeFs):
|
||||
|
||||
def test_existing_data(self):
|
||||
|
||||
fake_env.copy_dir(self.fs, os.path.join(os.path.dirname(__file__), 'tmpdir'), 'tmpdir')
|
||||
fake_env.copy_dir(self.fs, os.path.join(os.path.dirname(__file__), 'tmpdir'), 'tmpdir')
|
||||
fb = filebroker.FileBroker('tmpdir', create = True)
|
||||
|
||||
with open('tmpdir/bib/Page99.bibyaml', 'r') as f:
|
||||
@ -43,3 +43,68 @@ class TestEnDecode(TestFakeFs):
|
||||
|
||||
with open('tmpdir/meta/Page99.yaml', 'r') as f:
|
||||
self.assertEqual(fb.pull_metafile('Page99'), f.read())
|
||||
|
||||
def test_errors(self):
|
||||
|
||||
with self.assertRaises(IOError):
|
||||
filebroker.FileBroker('tmpdir', create = False)
|
||||
|
||||
fb = filebroker.FileBroker('tmpdir', create = True)
|
||||
with self.assertRaises(IOError):
|
||||
fb.pull_bibfile('Page99')
|
||||
with self.assertRaises(IOError):
|
||||
fb.pull_metafile('Page99')
|
||||
|
||||
def test_errors(self):
|
||||
|
||||
with self.assertRaises(IOError):
|
||||
filebroker.FileBroker('tmpdir', create = False)
|
||||
|
||||
fb = filebroker.FileBroker('tmpdir', create = True)
|
||||
|
||||
self.assertFalse(fb.exists('Page99'))
|
||||
with self.assertRaises(IOError):
|
||||
fb.pull_bibfile('Page99')
|
||||
with self.assertRaises(IOError):
|
||||
fb.pull_metafile('Page99')
|
||||
|
||||
def test_remove(self):
|
||||
|
||||
with self.assertRaises(IOError):
|
||||
filebroker.FileBroker('tmpdir', create = False)
|
||||
|
||||
fb = filebroker.FileBroker('tmpdir', create = True)
|
||||
|
||||
fb.push_bibfile('citekey1', 'abc')
|
||||
self.assertEqual(fb.pull_bibfile('citekey1'), 'abc')
|
||||
fb.push_metafile('citekey1', 'defg')
|
||||
self.assertEqual(fb.pull_metafile('citekey1'), 'defg')
|
||||
self.assertTrue(fb.exists('citekey1'))
|
||||
|
||||
fb.remove('citekey1')
|
||||
with self.assertRaises(IOError):
|
||||
self.assertEqual(fb.pull_bibfile('citekey1'), 'abc')
|
||||
with self.assertRaises(IOError):
|
||||
self.assertEqual(fb.pull_metafile('citekey1'), 'defg')
|
||||
self.assertFalse(fb.exists('citekey1'))
|
||||
|
||||
|
||||
class TestDocBroker(TestFakeFs):
|
||||
|
||||
def test_doccopy(self):
|
||||
|
||||
fake_env.copy_dir(self.fs, os.path.join(os.path.dirname(__file__), 'data'), 'data')
|
||||
|
||||
fb = filebroker.FileBroker('tmpdir', create = True)
|
||||
docb = filebroker.DocBroker('tmpdir')
|
||||
|
||||
docpath = docb.copy_doc('Page99', 'data/pagerank.pdf')
|
||||
self.assertTrue(content.check_file(os.path.join('tmpdir', 'doc/Page99.pdf')))
|
||||
|
||||
self.assertTrue(docb.is_pubsdir_doc(docpath))
|
||||
self.assertEqual(docpath, 'pubsdir://doc/Page99.pdf')
|
||||
docb.remove_doc('pubsdir://doc/Page99.pdf')
|
||||
|
||||
self.assertFalse(content.check_file(os.path.join('tmpdir', 'doc/Page99.pdf'), fail=False))
|
||||
with self.assertRaises(IOError):
|
||||
self.assertFalse(content.check_file(os.path.join('tmpdir', 'doc/Page99.pdf'), fail=True))
|
||||
|
Loading…
x
Reference in New Issue
Block a user