tiramisu/tiramisu/storage/sqlite3/value.py
2017-07-13 22:04:06 +02:00

254 lines
11 KiB
Python

# -*- coding: utf-8 -*-
"default plugin for value: set it in a simple dictionary"
# Copyright (C) 2013-2017 Team tiramisu (see AUTHORS for all contributors)
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# ____________________________________________________________
from .sqlite3db import Sqlite3DB
from .storage import delete_session
from ...setting import undefined, owners
from ...i18n import _
class Values(Sqlite3DB):
__slots__ = ('__weakref__',)
def __init__(self, storage):
"""init plugin means create values storage
"""
super(Values, self).__init__(storage)
def getsession(self):
pass
# sqlite
def _sqlite_select(self, path, index):
request = "SELECT value FROM value WHERE path = ? AND session_id = ? "
params = (path, self._session_id)
if index is not None:
request += "and idx = ? "
params = (path, self._session_id, index)
request += "LIMIT 1"
return self._storage.select(request, params)
# value
def setvalue(self, path, value, owner, index, session):
"""set value for an option
a specified value must be associated to an owner
"""
path = self._sqlite_encode_path(path)
if index is not None:
self._storage.execute("DELETE FROM value WHERE path = ? AND idx = ? AND "
"session_id = ?", (path, index, self._session_id),
commit=False)
self._storage.execute("INSERT INTO value(path, value, owner, idx, session_id) VALUES "
"(?, ?, ?, ?, ?)", (path, self._sqlite_encode(value),
str(owner),
index,
self._session_id),
commit=True)
else:
self._storage.execute("DELETE FROM value WHERE path = ? AND session_id = ?",
(path, self._session_id),
commit=False)
self._storage.execute("INSERT INTO value(path, value, owner, session_id) VALUES "
"(?, ?, ?, ?)", (path, self._sqlite_encode(value),
str(owner),
self._session_id),
commit=True)
def getvalue(self, path, session, index=None):
"""get value for an option
return: only value, not the owner
"""
path = self._sqlite_encode_path(path)
values = self._sqlite_select(path, index)
if values is None:
return values
return self._sqlite_decode(values[0])
def hasvalue(self, path, index=None):
"""if opt has a value
return: boolean
"""
path = self._sqlite_encode_path(path)
return self._sqlite_select(path, index) is not None
def resetvalue(self, path, session):
"""remove value means delete value in storage
"""
path = self._sqlite_encode_path(path)
self._storage.execute("DELETE FROM value WHERE path = ? AND session_id = ?", (path, self._session_id))
def get_modified_values(self):
"""return all values in a dictionary
example: {option1: (owner, 'value1'), option2: (owner, 'value2')}
"""
ret = {}
for path, value, owner, index, _ in self._storage.select("SELECT * FROM value WHERE "
"session_id = ?",
(self._session_id,),
only_one=False):
path = self._sqlite_decode_path(path)
owner = getattr(owners, owner)
value = self._sqlite_decode(value)
if isinstance(value, list):
value = tuple(value)
if index is None:
ret[path] = (owner, value)
else:
if path in ret:
ret[path][0][str(index)] = owner
ret[path][1][str(index)] = value
else:
ret[path] = ({str(index): owner}, {str(index): value})
return ret
# owner
def setowner(self, path, owner, session, index=None):
"""change owner for an option
"""
path = self._sqlite_encode_path(path)
if index is None:
self._storage.execute("UPDATE value SET owner = ? WHERE path = ? AND session_id = ?",
(str(owner), path, self._session_id))
else:
self._storage.execute("UPDATE value SET owner = ? WHERE path = ? and idx = ? AND session_id = ?",
(str(owner), path, index, self._session_id))
def getowner(self, path, default, session, index=None, only_default=False, with_value=False):
"""get owner for an option
return: owner object
"""
path = self._sqlite_encode_path(path)
request = "SELECT owner, value FROM value WHERE path = ? AND session_id = ?"
if index is not None:
request += " AND idx = ?"
params = (path, self._session_id, index)
else:
params = (path, self._session_id)
request += ' LIMIT 1'
owner = self._storage.select(request, params)
if owner is None:
if not with_value:
return default
else:
return default, None
else:
# autocreate owners
try:
nowner = getattr(owners, owner[0])
except AttributeError:
owners.addowner(towner)
nowner = getattr(owners, owner[0])
if not with_value:
return nowner
else:
value = self._sqlite_decode(owner[1])
return nowner, value
def set_information(self, key, value):
"""updates the information's attribute
(which is a dictionary)
:param key: information's key (ex: "help", "doc"
:param value: information's value (ex: "the help string")
"""
self._storage.execute("DELETE FROM information WHERE key = ? AND session_id = ?",
(key, self._session_id),
False)
self._storage.execute("INSERT INTO information(key, value, session_id) VALUES "
"(?, ?, ?)", (key, self._sqlite_encode(value), self._session_id))
def get_information(self, key, default):
"""retrieves one information's item
:param key: the item string (ex: "help")
"""
value = self._storage.select("SELECT value FROM information WHERE key = ? AND "
"session_id = ?",
(key, self._session_id))
if value is None:
if default is undefined:
raise ValueError(_("information's item"
" not found: {0}").format(key))
return default
else:
return self._sqlite_decode(value[0])
def del_information(self, key, raises):
if raises and self._storage.select("SELECT value FROM information WHERE key = ? "
"AND session_id = ?",
(key, self._session_id)) is None:
raise ValueError(_("information's item not found {0}").format(key))
self._storage.execute("DELETE FROM information WHERE key = ? AND session_id = ?",
(key, self._session_id))
def exportation(self, session, fake=False):
rows = self._storage.select("SELECT path, value, owner, idx FROM value WHERE "
"session_id = ?;", (self._session_id,), only_one=False)
ret = [[], [], [], []]
for row in rows:
path = row[0]
value = self._sqlite_decode(row[1])
owner = row[2]
index = row[3]
if index is None:
ret[0].append(path)
ret[1].append(index)
ret[2].append(value)
ret[3].append(owner)
else:
if path in ret[0]:
path_idx = ret[0].index(path)
ret[1][path_idx].append(index)
ret[2][path_idx].append(value)
else:
ret[0].append(path)
ret[1].append([index])
ret[2].append([value])
ret[3].append(owner)
return ret
def importation(self, export):
self._storage.execute("DELETE FROM value WHERE session_id = ?", (self._session_id,),
commit=False)
for idx, path in enumerate(export[0]):
index = export[1][idx]
value = export[2][idx]
owner = export[3][idx]
if index is None:
self._storage.execute("INSERT INTO value(path, value, owner, idx, session_id) VALUES "
"(?, ?, ?, ?, ?)", (path, self._sqlite_encode(value),
str(owner), index,
self._session_id), commit=False)
else:
for val in zip(index, value, owner):
self._storage.execute("INSERT INTO value(path, value, owner, idx, session_id)"
"VALUES (?, ?, ?, ?, ?)", (path,
self._sqlite_encode(val[1]),
str(val[2]), val[0],
self._session_id),
commit=False)
self._storage._conn.commit()
def get_max_length(self, path, session):
val_max = self._storage.select("SELECT max(idx) FROM value WHERE path = ? AND session_id = ?",
(path, self._session_id), False)
if val_max[0][0] is None:
return 0
return val_max[0][0] + 1