#------------------------------------------------------------------- # Python implementation of ScrutariDataExport API # http://www.scrutari.net/dokuwiki/scrutaridata:exportapi # # Copyright (c) 2015-2016 Vincent Calame - Exemole # Licensed under MIT (http://en.wikipedia.org/wiki/MIT_License) # # Date : 2016-09-07 # Svn-Revision : 1214 #------------------------------------------------------------------- import time, os import collections from xml.dom.minidom import parse from collections import OrderedDict #------------------------------------------------------------------- # Constantes #------------------------------------------------------------------- START = 1 BASEMETADATA = 2 CORPUSMETADATA = 3 FICHE = 4 THESAURUSMETADATA = 5 MOTCLE = 6 END = 7 INTITULE_SHORT = 1 INTITULE_LONG = 2 INTITULE_CORPUS = 1 INTITULE_FICHE = 2 INTITULE_THESAURUS = 1 #------------------------------------------------------------------- # Classes #------------------------------------------------------------------- class ScrutariDataExport(object): """ Voir http://www.scrutari.net/dokuwiki/scrutaridata:exportapi:scrutaridataexport" xmlWriter est une instance de l'utilitaire xmlWriter """ def __init__(self, xmlWriter): self.xmlWriter = xmlWriter self.currentState = START self.xmlBuilder = None self.ficheExport = FicheExport() self.motcleExport = MotcleExport() self.indexationMap = OrderedDict() def startExport(self): if self.currentState != START: raise Exception('startExport / '.self.currentState) self.xmlWriter.openTag("base") self.currentState = BASEMETADATA baseMetadataExport = BaseMetadataExport() self.xmlBuilder = baseMetadataExport return baseMetadataExport def newCorpus(self, corpusName): self.commonTest('newCorpus'); self.flushXmlBuilder() self.flushSubset() self.xmlWriter.openTagWithAttribute("corpus", "corpus-name", corpusName) self.currentState = CORPUSMETADATA corpusMetadataExport = CorpusMetadataExport() self.xmlBuilder = corpusMetadataExport return corpusMetadataExport def newFiche(self, ficheId): if self.currentState != FICHE and self.currentState != CORPUSMETADATA: raise ('newFiche' + ' / ' + self.currentState); self.flushXmlBuilder() self.currentState = FICHE self.ficheExport.reinit(ficheId) self.xmlBuilder = self.ficheExport return self.ficheExport def newThesaurus(self, thesaurusName): self.commonTest('newThesaurus') self.flushXmlBuilder() self.flushSubset() self.xmlWriter.openTagWithAttribute("thesaurus", "thesaurus-name", thesaurusName) self.currentState = THESAURUSMETADATA thesaurusMetadataExport = ThesaurusMetadataExport() self.xmlBuilder = thesaurusMetadataExport return thesaurusMetadataExport def newMotcle(self, motcleId): if self.currentState != MOTCLE and self.currentState != THESAURUSMETADATA: raise Exception('newMotcle' + ' / ' + self.currentState) self.flushXmlBuilder() self.currentState = MOTCLE self.motcleExport.reinit(motcleId) self.xmlBuilder = self.motcleExport return self.motcleExport def addIndexation(self, corpusName, ficheId, thesaurusName, motcleId, poids): self.commonTest('addIndexation'); subsetKey = corpusName + "\t" + thesaurusName; itemKey = str(ficheId) + "\t" + str(motcleId) if subsetKey in self.indexationMap: self.indexationMap[subsetKey][itemKey] = poids else: self.indexationMap[subsetKey] = OrderedDict([(itemKey, poids)]) def endExport(self): self.commonTest('endExport') self.flushXmlBuilder() self.flushSubset() self.writeIndexation() self.xmlWriter.closeTag("base") self.currentState = END def getState(self): return self.currentState def commonTest(self, methodName): if self.currentState == START or self.currentState == END: raise Exception(methodName + ' / ' + self.currentState) def flushXmlBuilder(self): if self.xmlBuilder is not None: self.xmlBuilder.writeXML(self.xmlWriter) self.xmlBuilder = None def flushSubset(self): if self.currentState == FICHE or self.currentState == CORPUSMETADATA: self.xmlWriter.closeTag("corpus") elif self.currentState == MOTCLE or self.currentState == THESAURUSMETADATA: self.xmlWriter.closeTag("thesaurus") def writeIndexation(self): indexationGroupAttributesMap = OrderedDict([("corpus-path", ""), ("thesaurus-path", "")]) indexationAttributesMap = OrderedDict([("fiche-id", ""), ("motcle-id", ""), ("poids", 1)]) for subsetKey, idxMap in self.indexationMap.items(): pos1 = subsetKey.find("\t") indexationGroupAttributesMap["corpus-path"] = subsetKey[0:pos1] indexationGroupAttributesMap["thesaurus-path"] = subsetKey[pos1 + 1:] self.xmlWriter.openTagWithAttributes("indexation-group", indexationGroupAttributesMap) for itemKey, poids in idxMap.items(): pos2 = itemKey.find("\t") indexationAttributesMap["fiche-id"] = itemKey[0:pos2] indexationAttributesMap["motcle-id"] = itemKey[pos2 + 1:] if poids > 1: indexationAttributesMap["poids"] = poids elif "poids" in indexationAttributesMap: del indexationAttributesMap["poids"] self.xmlWriter.addEmptyElement("indexation", indexationAttributesMap); self.xmlWriter.closeTag("indexation-group"); class BaseMetadataExport(object): "http://www.scrutari.net/dokuwiki/scrutaridata:exportapi:basemetadataexport" def __init__(self): self.authority = "" self.baseIcon = "" self.shortMap = OrderedDict() self.longMap = OrderedDict() self.langUIArray = [] def setAuthority(self, authority): self.authority = authority def setBaseName(self, baseName): self.baseName = baseName def setBaseIcon(self, baseIcon): self.baseIcon = baseIcon def setIntitule(self, intituleType, lang, intituleValue): if intituleType == INTITULE_SHORT: self.shortMap[lang] = intituleValue elif intituleType == INTITULE_LONG: self.longMap[lang] = intituleValue else: raise Exception("Wrong intituleType = " + intituleType) def addLangUI(self, lang): self.langUIArray.append(lang) def writeXML(self, xmlWriter): xmlWriter.openTag("base-metadata") xmlWriter.addSimpleElement("authority", self.authority) xmlWriter.addSimpleElement("base-name", self.baseName) xmlWriter.addSimpleElement("base-icon", self.baseIcon) self.addMap(INTITULE_SHORT, self.shortMap, xmlWriter) self.addMap(INTITULE_LONG, self.longMap, xmlWriter) size = len(self.langUIArray) if size > 0: xmlWriter.openTag("langs-ui"); for lang in self.langUIArray: xmlWriter.addSimpleElement("lang", lang) xmlWriter.closeTag("langs-ui"); xmlWriter.closeTag("base-metadata"); def addMap(self, intituleType, libMap, xmlWriter): if len(libMap) == 0: return suffix = "" if intituleType == INTITULE_SHORT: suffix = "short" elif intituleType == INTITULE_LONG: suffix = "long" xmlWriter.openTag("intitule-" + suffix) for cle,valeur in libMap.items(): xmlWriter.addLibElement(cle, valeur); xmlWriter.closeTag("intitule-" + suffix); class CorpusMetadataExport(object): "Voir http://www.scrutari.net/dokuwiki/scrutaridata:exportapi:corpusmetadataexport" def __init__(self): self.corpusIcon = "" self.hrefParent = "" self.corpusMap = OrderedDict() self.ficheMap = OrderedDict() self.complementMapArray = [] def setCorpusIcon(self, corpusIcon): self.corpusIcon = corpusIcon def setHrefParent(self, hrefParent): self.hrefParent = hrefParent def setIntitule(self, intituleType, lang, intituleValue): if intituleType == INTITULE_CORPUS: self.corpusMap[lang] = intituleValue elif intituleType == INTITULE_FICHE: self.ficheMap[lang] = intituleValue else: raise Exception("Wrong intituleType = " + intituleType) def addComplement(self): complementMap = OrderedDict() self.complementMapArray.append(complementMap) return len(self.complementMapArray) def setComplementIntitule(self, complementNumber, lang, intituleValue): if complementNumber < 0: return elif complementNumber > len(self.complementMapArray): return else: self.complementMapArray[complementNumber -1][lang] = intituleValue def writeXML(self, xmlWriter): xmlWriter.openTag("corpus-metadata") self.addMap(INTITULE_CORPUS, self.corpusMap, xmlWriter) self.addMap(INTITULE_FICHE, self.ficheMap, xmlWriter) xmlWriter.addSimpleElement("href-parent", self.hrefParent) xmlWriter.addSimpleElement("corpus-icon", self.corpusIcon) for complementMap in self.complementMapArray: xmlWriter.openTag("complement-metadata") for cle, valeur in complementMap.items(): xmlWriter.addLibElement(cle, valeur) xmlWriter.closeTag("complement-metadata") xmlWriter.closeTag("corpus-metadata") def addMap(self, intituleType, intituleMap, xmlWriter): if len(intituleMap) == 0: return suffix = "" if intituleType == INTITULE_CORPUS: suffix = "corpus" elif intituleType == INTITULE_FICHE: suffix = "fiche" xmlWriter.openTag("intitule-" + suffix) for cle, valeur in intituleMap.items(): xmlWriter.addLibElement(cle, valeur) xmlWriter.closeTag("intitule-" + suffix) class ThesaurusMetadataExport(object): "Voir http://www.scrutari.net/dokuwiki/scrutaridata:exportapi:thesaurusmetadataexport" def __init__(self): self.thesaurusMap = OrderedDict() def setIntitule(self, intituleType, lang, intituleValue): if intituleType == INTITULE_THESAURUS: self.thesaurusMap[lang] = intituleValue else: raise Exception("Wrong intituleType = " + intituleType); def writeXML(self, xmlWriter): xmlWriter.openTag("thesaurus-metadata") self.addMap(INTITULE_THESAURUS, self.thesaurusMap, xmlWriter) xmlWriter.closeTag("thesaurus-metadata") def addMap(self, intituleType, intituleMap, xmlWriter): if len(intituleMap) == 0: return suffix = "" if intituleType == INTITULE_THESAURUS: suffix = "thesaurus" xmlWriter.openTag("intitule-" + suffix) for cle, valeur in intituleMap.items(): xmlWriter.addLibElement(cle, valeur) xmlWriter.closeTag("intitule-" + suffix) class FicheExport(object): "Voir http://www.scrutari.net/dokuwiki/scrutaridata:exportapi:ficheexport" def __init__(self): self.ficheId = "" self.titre = "" self.soustitre = "" self.date = "" self.lang = "" self.href = "" self.ficheIcon = "" self.latitude = "" self.longitude = "" self.complementArray = [] self.attributeMap = OrderedDict() def reinit(self, ficheId): self.ficheId = ficheId self.titre = "" self.soustitre = "" self.date = "" self.lang = "" self.href = "" self.ficheIcon = "" self.latitude = "" self.longitude = "" self.complementArray = [] self.attributeMap = OrderedDict() def setTitre(self, titre): self.titre = titre def setSoustitre(self, soustitre): self.soustitre = soustitre def setDate(self, date): self.date = date def setLang(self, lang): self.lang = lang def setHref(self, href): self.href = href def setFicheIcon(self, ficheIcon): self.ficheIcon = ficheIcon def setGeoloc(self, latitude, longitude): self.latitude = latitude self.longitude = longitude def addComplement(self, complementNumber, complementValue): if complementNumber < 1: return if complementValue is None: complementValue = "" else: complementValue = str(complementValue).strip() currentLen = len(self.complementArray) if complementNumber > currentLen: if len(complementValue) == 0: return for i in range(currentLen, complementNumber): self.complementArray.append("") self.complementArray[complementNumber - 1] = complementValue def addAttributeValue(self, nameSpace, localKey, attributeValue): if attributeValue is None: return attributeValue = str(attributeValue).strip() if len(attributeValue) == 0: return key = nameSpace + ":" + localKey if key in self.attributeMap: self.attributeMap[key].append(attributeValue) else: self.attributeMap[key] = [attributeValue] def writeXML(self, xmlWriter): xmlWriter.openTagWithAttribute("fiche", "fiche-id", self.ficheId) xmlWriter.addSimpleElement("titre", self.titre) xmlWriter.addSimpleElement("soustitre", self.soustitre) xmlWriter.addSimpleElement("date", self.date) xmlWriter.addSimpleElement("lang", self.lang) xmlWriter.addSimpleElement("href", self.href) xmlWriter.addSimpleElement("fiche-icon", self.ficheIcon) if len(self.latitude) > 0: xmlWriter.openTag("geoloc") xmlWriter.addSimpleElement("lat", self.latitude) xmlWriter.addSimpleElement("lon", self.longitude) xmlWriter.closeTag("geoloc") for complement in self.complementArray: complementString = str(complement).strip() if len( complementString) > 0: xmlWriter.addSimpleElement("complement", complementString) else: xmlWriter.addEmptyElement("complement", None) for key, values in self.attributeMap.items(): keyList = key.split(':') xmlWriter.openTagWithAttributes("attr", OrderedDict([("ns",keyList[0]), ("key",keyList[1])])) for value in values: xmlWriter.addSimpleElement("val", value) xmlWriter.closeTag("attr"); xmlWriter.closeTag("fiche"); class MotcleExport(object): "Voir http://www.scrutari.net/dokuwiki/scrutaridata:exportapi:motcleexport" def __init__(self): self.motcleId = "" self.libelleMap = OrderedDict() def reinit(self, motcleId): self.motcleId = motcleId self.libelleMap = OrderedDict() def setLibelle(self, lang, text): self.libelleMap[lang] = text def writeXML(self, xmlWriter): xmlWriter.openTagWithAttribute("motcle", "motcle-id", self.motcleId) for cle, valeur in self.libelleMap.items(): xmlWriter.addLibElement(cle, valeur) xmlWriter.closeTag("motcle") #------------------------------------------------------------------- # ThesaurusBuffer est une classe utilitaire qui stocker les mots-clés # lors de la lecture d'une page pour les restituer plus tard #------------------------------------------------------------------- class ThesaurusBuffer(object): def __init__(self, path = None): self.path = path self.motcleMap = OrderedDict() self.availableId = 1 self.langMap = {} if (path is not None) and (os.path.exists(path)): dom = parse(path) for motcleElement in dom.getElementsByTagName("motcle"): motcleId = motcleElement.getAttribute("motcle-id") if len(motcleId) > 0: if motcleId in self.motcleMap: motcleBuffer = self.motcleMap[motcleId] else: motcleBuffer = MotcleBuffer(motcleId) self.motcleMap[motcleId] = motcleBuffer for libElement in motcleElement.getElementsByTagName("lib"): lang = libElement.getAttribute("xml:lang") if len(lang) > 0: rc = [] for node in libElement.childNodes: if node.nodeType == node.TEXT_NODE: rc.append(node.data) motcleBuffer.put(lang, ''.join(rc).strip()) for motcleId, motcleBuffer in self.motcleMap.items(): if isinstance(motcleId, int): if motcleId > self.availableId: self.availableId = motcleId + 1 for lang, text in motcleBuffer.libelleMap.items(): subMap = self.getSubMap(lang) subMap[text] = motcleBuffer def getSubMap(self, lang): if lang in self.langMap: return self.langMap[lang] else: subMap = {} self.langMap[lang] = subMap return subMap def getMotcleId(self, lang, text): if lang in self.langMap: subMap = self.langMap[lang] if text in subMap: return subMap[text].motcleId return None def checkMotcle(self, motcleId, lang, text): if motcleId in self.motcleMap: motcleBuffer = self.motcleMap[motcleId] else: motcleBuffer = MotcleBuffer(motcleId) self.motcleMap[motcleId] = motcleBuffer existing = motcleBuffer.check(lang, text) if not existing: subMap = self.getSubMap(lang) subMap[text] = motcleBuffer def exportMotscles(self, scrutariDataExport): for motcleId, motcleBuffer in self.motcleMap.items(): motcleExport = scrutariDataExport.newMotcle(motcleId) for lang, text in motcleBuffer.libelleMap.items(): motcleExport.setLibelle(lang, text) class MotcleBuffer(object): def __init__(self, motcleId): self.motcleId = motcleId self.libelleMap = OrderedDict() def put(self, lang, text): self.libelleMap[lang] = text def check(self, lang, text): if lang in self.libelleMap: return True else: self.libelleMap[lang] = text return False #------------------------------------------------------------------- # Utilitaires #------------------------------------------------------------------- def writeScrutariInfo(xmlWriter, scrutariDataUrls, date=""): """Écriture du XML du fichier d'information Voir http://www.scrutari.net/dokuwiki/serveurscrutari:config:scrutariinfo scrutariDataUrls peut être une liste ou une chaine""" if len(date) == 0: date = time.strftime("%Y-%m-%d") xmlWriter.openTag("info") xmlWriter.addSimpleElement("date", date) if isinstance(scrutariDataUrls, list): for scrutariDataUrl in scrutariDataUrls: xmlWriter.addSimpleElement("scrutaridata-url", scrutatiDataUrl) else: xmlWriter.addSimpleElement("scrutaridata-url", scrutariDataUrls) xmlWriter.closeTag("info") class XmlWriter: """Utilitaire d'écriture de XML utilisé par les différentes classes l'objet writer passé en argument doit être un objet possédant une méthode write (typiquement, l'objet file créé par open() """ def __init__(self, writer, pretty_xml = True, with_declaration = True): self.writer = writer if pretty_xml: self.indentLength = 0 else: self.indentLength = -999999 if with_declaration: self.appendDeclaration() def appendDeclaration(self): self.writer.write('') if self.indentLength < 0: self.writer.write('\n') def openTag(self, tagName, increaseIndent = True): self.appendIndent() self.writer.write('<') self.writer.write(tagName) self.writer.write('>') if increaseIndent: self.increaseIndentValue() def openTagWithAttribute(self, tagName, attributeName, attributeValue, increaseIndent = True): self.appendIndent() self.writer.write('<') self.writer.write(tagName) self.addAttribute(attributeName, attributeValue) self.writer.write('>') if increaseIndent: self.increaseIndentValue() def openTagWithAttributes(self, tagName, attributesMap, increaseIndent = True): self.appendIndent() self.writer.write('<') self.writer.write(tagName) for attributeName, attributeValue in attributesMap.items(): self.addAttribute(attributeName, attributeValue) self.writer.write('>') if increaseIndent: self.increaseIndentValue() def addAttribute(self, attributeName, attributeValue): if attributeValue is None: return attributeValue = str(attributeValue) if len(attributeValue) == 0: return self.writer.write(' ') self.writer.write(attributeName) self.writer.write('=') self.writer.write('"') self.escape(attributeValue) self.writer.write('"') def escape(self, value): for char in value: if char == '<': self.writer.write("<") elif char == '>': self.writer.write(">") elif char == '&': self.writer.write("&") elif char == '\'': self.writer.write("'") elif char == '\"': self.writer.write(""") elif char == '\u00A0': self.writer.write(" ") else: self.writer.write(char) def closeTag(self, tagName, decrease = True): if decrease: self.decreaseIndentValue() self.appendIndent() self.writer.write('') def addSimpleElement(self, tagName, value): if value is None: return value = str(value).strip() if len(value) == 0: return self.openTag(tagName, False) self.escape(value) self.closeTag(tagName, False) def addEmptyElement(self, tagName, attributesMap): self.appendIndent() self.writer.write('<') self.writer.write(tagName) if attributesMap is not None: for attributeName, attributeValue in attributesMap.items(): self.addAttribute(attributeName, attributeValue) self.writer.write('/>') def addLibElement(self, lang, value): if value is None: return value = str(value) if len(value) == 0: return self.appendIndent() self.writer.write('') self.escape(value) self.writer.write('') def appendIndent(self): if self.indentLength > - 1: self.writer.write('\n') for i in range(0, self.indentLength): self.writer.write('\t') def increaseIndentValue(self): self.indentLength = self.indentLength + 1 def decreaseIndentValue(self): self.indentLength = self.indentLength - 1