first test of new repo passing

main
Fabien Benureau 12 years ago
parent 320a488667
commit a354e79523

@ -1,13 +1,14 @@
import os import os
import shutil import shutil
import glob import glob
import itertools
from . import files from . import files
from .paper import PaperInRepo, NoDocumentFile from .paper import PaperInRepo, NoDocumentFile
from . import configs from . import configs
ALPHABET = 'abcdefghijklmopqrstuvwxyz'
BASE_FILE = 'papers.yaml' BASE_FILE = 'papers.yaml'
BIB_DIR = 'bibdata' BIB_DIR = 'bibdata'
META_DIR = 'meta' META_DIR = 'meta'
@ -24,24 +25,65 @@ class InvalidReference(Exception):
class Repository(object): class Repository(object):
def __init__(self, config=None): def __init__(self, config, load = True):
self.papersdir = None """Initialize the repository.
self.citekeys = []
if config is None: :param load: if load is True, load the repository from disk,
config = configs.CONFIG from path config.papers_dir.
"""
self.config = config self.config = config
self.citekeys = []
def has_paper(self, citekey): # @classmethod
# def from_directory(cls, config, papersdir=None):
# repo = cls(config)
# if papersdir is None:
# papersdir = config.papers_dir
# repo.papersdir = files.clean_path(papersdir)
# repo.load()
# return repo
def __contains__(self, citekey):
"""Allows to use 'if citekey in repo' pattern"""
return citekey in self.citekeys return citekey in self.citekeys
def get_paper(self, citekey): def __len__(self):
"""Load a paper by its citekey from disk, if necessary.""" return len(self.citekeys)
return PaperInRepo.load(
self, self.path_to_paper_file(citekey, 'bib'),
metapath=self.path_to_paper_file(citekey, 'meta')) # load, save repo
def init_dirs(self):
"""Create, if necessary, the repository directories.
Can be called more than once.
"""
self.bib_dir = files.clean_path(self.config.papers_dir, BIB_DIR)
self.meta_dir = files.clean_path(self.config.papers_dir, META_DIR)
self.doc_dir = files.clean_path(self.config.doc_dir)
self.cfg_path = files.clean_path(self.config.papers_dir, 'papers.yaml')
for d in [self.bib_dir, self.meta_dir, self.doc_dir]:
if not os.path.exists(d):
os.makedirs(d)
def load(self):
"""Load the repository, creating dirs if necessary"""
self.init_dirs()
repo_config = files.read_yamlfile(self.cfg_path)
self.citekeys = repo_config['citekeys']
def save(self):
"""Save the repo, creating dirs if necessary"""
self.init_dirs()
repo_cfg = {'citekeys': self.citekeys}
files.write_yamlfile(self.cfg_path, repo_cfg)
def citekey_from_ref(self, ref): # reference
"""Tries to get citekey from given ref.
def ref2citekey(self, ref):
"""Tries to get citekey from given reference.
Ref can be a citekey or a number. Ref can be a citekey or a number.
""" """
if ref in self.citekeys: if ref in self.citekeys:
@ -52,139 +94,80 @@ class Repository(object):
except (IndexError, ValueError): except (IndexError, ValueError):
raise(InvalidReference) raise(InvalidReference)
# creating new papers
def add_paper(self, p): # papers
def all_papers(self):
for key in self.citekeys:
yield self.get_paper(key)
def get_paper(self, citekey):
"""Load a paper by its citekey from disk, if necessary."""
return PaperInRepo.load(self, self._bibfile(citekey),
self._metafile(citekey))
# add, remove papers
def add_paper(self, p, overwrite = False):
if p.citekey is None: # TODO also test if citekey is valid if p.citekey is None: # TODO also test if citekey is valid
raise(ValueError("Invalid citekey: %s." % p.citekey)) raise(ValueError("Invalid citekey: %s." % p.citekey))
elif self.has_paper(p.citekey): if not overwrite and p.citekey in self:
raise(ValueError("Citekey already exists in repository: %s" raise(ValueError('citekey {} already in use'.format(p.citekey)))
% p.citekey))
self.citekeys.append(p.citekey) self.citekeys.append(p.citekey)
# write paper files
self.save_paper(p) self.save_paper(p)
# update repository files
self.save() self.save()
# TODO change to logging system (17/12/2012) # TODO change to logging system (17/12/2012)
print "Added: %s" % p.citekey print "Added: %s" % p.citekey
def add_or_update(self, paper): def rename_paper(self, paper, new_citekey, overwrite=False):
if not self.has_paper(paper.citekey): """Modify the citekey of a paper, and propagate changes to disk"""
self.add_paper(paper) if paper.citekey not in self:
else: raise(ValueError, 'paper {} not in repository'.format(p.citekey))
self.save_paper(paper) if not overwrite and new_citekey not in self:
raise(ValueError, 'citekey {} already in use'.format(new_citekey))
paper.remove_paper(paper.citekey, remove_doc = False)
paper.citekey = new_citekey
self.add_paper(paper, overwrite = overwrite)
def update(self, paper, old_citekey=None, overwrite=False): def _bibfile(self, citekey):
"""Updates a paper, eventually changing its citekey. return os.path.join(self.bib_dir, citekey + '.bibyaml')
The paper should be in repository. If the citekey changes,
the new citekey should be free except if the overwrite argument def _metafile(self, citekey):
is set to True. return os.path.join(self.meta_dir, citekey + '.meta')
"""
if old_citekey is None: def remove_paper(self, citekey, remove_doc = True):
old_citekey = paper.citekey
if old_citekey not in self.citekeys:
raise(ValueError, 'Paper not in repository. Add first')
else:
if paper.citekey == old_citekey:
self.save_paper(paper)
else:
if self.has_paper(paper.citekey):
if not overwrite:
raise(CiteKeyAlreadyExists,
"There is already a paper with citekey: %s."
% paper.citekey)
else:
self.save_paper(paper)
else:
self.add_paper(paper)
# Eventually move document file
paper = PaperInRepo.from_paper(paper, self)
try:
path = self.find_document(old_citekey)
self.import_document(paper.citekey, path)
except NoDocumentFile:
pass
self.remove(old_citekey)
def remove(self, citekey):
paper = self.get_paper(citekey) paper = self.get_paper(citekey)
self.citekeys.remove(citekey) self.citekeys.remove(citekey)
self.save() os.remove(self._metafile(citekey))
for f in ('bib', 'meta'): os.remove(self._bibfile(citekey))
os.remove(self.path_to_paper_file(citekey, f))
# Eventually remove associated document # Eventually remove associated document
try: if remove_doc:
path = paper.get_document_path_in_repo() try:
os.remove(path) path = paper.get_document_path_in_repo()
except NoDocumentFile: os.remove(path)
pass except NoDocumentFile:
pass
self.save()
def save_paper(self, paper): def save_paper(self, paper):
if not self.has_paper(paper.citekey): if not paper.citekey in self:
raise(ValueError('Paper not in repository, first add it.')) raise(ValueError('Paper not in repository, first add it.'))
paper.save_to_disc(self.path_to_paper_file(paper.citekey, 'bib'), paper.save(self._bibfile(paper.citekey),
self.path_to_paper_file(paper.citekey, 'meta')) self._metafile(paper.citekey))
def get_free_citekey(self, paper, citekey=None): def generate_citekey(self, paper, citekey=None):
"""Create a unique citekey for the given paper. """Create a unique citekey for the given paper."""
"""
if citekey is None: if citekey is None:
citekey = paper.generate_citekey() citekey = paper.generate_citekey()
num = [] for n in itertools.count():
while citekey + _to_suffix(num) in self.citekeys: if not citekey + _base27(n) in self.citekeys:
_str_incr(num) return citekey + _base27(n)
return citekey + _to_suffix(num)
def base_file_path(self):
return os.path.join(self.papersdir, 'papers.yaml')
def size(self):
return len(self.citekeys)
def save(self):
papers_config = {'citekeys': self.citekeys}
files.write_yamlfile(self.base_file_path(), papers_config)
def load(self):
papers_config = files.read_yamlfile(self.base_file_path())
self.citekeys = papers_config['citekeys']
def init(self, papersdir):
self.papersdir = papersdir
os.makedirs(os.path.join(self.papersdir, BIB_DIR))
os.makedirs(os.path.join(self.papersdir, META_DIR))
doc_dir = self.get_document_directory()
if not os.path.exists(doc_dir):
os.makedirs(doc_dir)
self.save()
def path_to_paper_file(self, citekey, file_):
if file_ == 'bib':
return os.path.join(self.papersdir, BIB_DIR, citekey + '.bibyaml')
elif file_ == 'meta':
return os.path.join(self.papersdir, META_DIR, citekey + '.meta')
else:
raise(ValueError("%s is not a valid paper file." % file_))
def get_document_directory(self):
if self.config.has_option(configs.MAIN_SECTION, 'document-directory'):
doc_dir = self.config.get(configs.MAIN_SECTION,
'document-directory')
else:
doc_dir = os.path.join(self.papersdir, DOC_DIR)
return files.clean_path(doc_dir)
def find_document(self, citekey):
doc_dir = self.get_document_directory()
found = glob.glob(doc_dir + "/%s.*" % citekey)
if found:
return found[0]
else:
raise NoDocumentFile
def all_papers(self):
for key in self.citekeys:
yield self.get_paper(key)
def import_document(self, citekey, doc_file): def import_document(self, citekey, doc_file):
if citekey not in self.citekeys: if citekey not in self.citekeys:
@ -204,35 +187,12 @@ class Repository(object):
tags = tags.union(p.tags) tags = tags.union(p.tags)
return tags return tags
@classmethod
def from_directory(cls, config, papersdir=None):
repo = cls(config=config)
if papersdir is None:
papersdir = config.get(configs.MAIN_SECTION, 'papers-directory')
repo.papersdir = files.clean_path(papersdir)
repo.load()
return repo
def _char_incr(c):
return chr(ord(c) + 1)
def _str_incr(l): def _base27(n):
"""Increment a number in a list string representation. return _base27((n-1) // 26) + chr(97+((n-1)% 26)) if n else ''
Numbers are represented in base 26 with letters as digits.
"""
pos = 0
while pos < len(l):
if l[pos] == 'z':
l[pos] = 'a'
pos += 1
else:
l[pos] = _char_incr(l[pos])
return
l.append('a')
def _to_suffix(l): def _base(num, b):
return ''.join(l[::-1]) q, r = divmod(num - 1, len(b))
return _base(q, b) + b[r] if num else ''

@ -3,31 +3,27 @@ import tempfile
import shutil import shutil
import os import os
import testenv
import fixtures import fixtures
from papers.repo import (Repository, _str_incr, _to_suffix, BIB_DIR, META_DIR, from papers.repo import (Repository, _base27, BIB_DIR, META_DIR,
CiteKeyAlreadyExists) CiteKeyAlreadyExists)
from papers.paper import PaperInRepo from papers.paper import PaperInRepo
from papers import configs
class TestCitekeyGeneration(unittest.TestCase): class TestCitekeyGeneration(unittest.TestCase):
def test_string_increment(self): def test_string_increment(self):
l = [] self.assertEqual(_base27(0), '')
self.assertEqual(_to_suffix(l), '') for i in range(26):
_str_incr(l) self.assertEqual(_base27(i+1), chr(97+i))
self.assertEqual(_to_suffix(l), 'a') self.assertEqual(_base27(26+i+1), 'a' + chr(97+i))
_str_incr(l)
self.assertEqual(_to_suffix(l), 'b')
l = ['z']
_str_incr(l)
self.assertEqual(_to_suffix(l), 'aa')
def test_generated_key_is_unique(self): def test_generated_key_is_unique(self):
repo = Repository() repo = Repository(configs.Config())
repo.citekeys = ['Turing1950', 'Doe2003'] repo.citekeys = ['Turing1950', 'Doe2003']
c = repo.get_free_citekey(fixtures.turing1950) c = repo.generate_citekey(fixtures.turing1950)
repo.citekeys.append('Turing1950a') repo.citekeys.append('Turing1950a')
c = repo.get_free_citekey(fixtures.turing1950) c = repo.generate_citekey(fixtures.turing1950)
self.assertEqual(c, 'Turing1950b') self.assertEqual(c, 'Turing1950b')
@ -35,7 +31,7 @@ class TestRepo(unittest.TestCase):
def setUp(self): def setUp(self):
self.tmpdir = tempfile.mkdtemp() self.tmpdir = tempfile.mkdtemp()
self.repo = Repository() self.repo = Repository(configs.Config())
self.repo.init(self.tmpdir) self.repo.init(self.tmpdir)
self.repo.add_paper(fixtures.turing1950) self.repo.add_paper(fixtures.turing1950)

Loading…
Cancel
Save