# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. # pylint: disable=sql-injection from __future__ import annotations import enum import json import logging import re from binascii import crc32 from collections import defaultdict from typing import Iterable, Union import psycopg2 from .misc import named_to_positional_printf _schema = logging.getLogger('odoo.schema') IDENT_RE = re.compile(r'^[a-z0-9_][a-z0-9_$\-]*$', re.I) _CONFDELTYPES = { 'RESTRICT': 'r', 'NO ACTION': 'a', 'CASCADE': 'c', 'SET NULL': 'n', 'SET DEFAULT': 'd', } class SQL: """ An object that wraps SQL code with its parameters, like:: sql = SQL("UPDATE TABLE foo SET a = %s, b = %s", 'hello', 42) cr.execute(sql) The code is given as a ``%``-format string, and supports either positional arguments (with `%s`) or named arguments (with `%(name)s`). Escaped characters (like ``"%%"``) are not supported, though. The arguments are meant to be merged into the code using the `%` formatting operator. The SQL wrapper is designed to be composable: the arguments can be either actual parameters, or SQL objects themselves:: sql = SQL( "UPDATE TABLE %s SET %s", SQL.identifier(tablename), SQL("%s = %s", SQL.identifier(columnname), value), ) The combined SQL code is given by ``sql.code``, while the corresponding combined parameters are given by the list ``sql.params``. This allows to combine any number of SQL terms without having to separately combine their parameters, which can be tedious, bug-prone, and is the main downside of `psycopg2.sql `. The second purpose of the wrapper is to discourage SQL injections. Indeed, if ``code`` is a string literal (not a dynamic string), then the SQL object made with ``code`` is guaranteed to be safe, provided the SQL objects within its parameters are themselves safe. """ __slots__ = ('__code', '__args') # pylint: disable=keyword-arg-before-vararg def __new__(cls, code: (str | SQL) = "", /, *args, **kwargs): if isinstance(code, SQL): return code # validate the format of code and parameters if args and kwargs: raise TypeError("SQL() takes either positional arguments, or named arguments") if args: code % tuple("" for arg in args) elif kwargs: code, args = named_to_positional_printf(code, kwargs) self = object.__new__(cls) self.__code = code self.__args = args return self @property def code(self) -> str: """ Return the combined SQL code string. """ stack = [] # stack of intermediate results for node in self.__postfix(): if not isinstance(node, SQL): stack.append("%s") elif arity := len(node.__args): stack[-arity:] = [node.__code % tuple(stack[-arity:])] else: stack.append(node.__code) return stack[0] @property def params(self) -> list: """ Return the combined SQL code params as a list of values. """ return [node for node in self.__postfix() if not isinstance(node, SQL)] def __postfix(self): """ Return a postfix iterator for the SQL tree ``self``. """ stack = [(self, False)] while stack: node, ispostfix = stack.pop() if ispostfix or not isinstance(node, SQL): yield node else: stack.append((node, True)) stack.extend((arg, False) for arg in reversed(node.__args)) def __repr__(self): return f"SQL({', '.join(map(repr, [self.code, *self.params]))})" def __bool__(self): return bool(self.__code) def __eq__(self, other): return self.code == other.code and self.params == other.params def __iter__(self): """ Yields ``self.code`` and ``self.params``. This was introduced for backward compatibility, as it enables to access the SQL and parameters by deconstructing the object:: sql = SQL(...) code, params = sql """ yield self.code yield self.params def join(self, args: Iterable) -> SQL: """ Join SQL objects or parameters with ``self`` as a separator. """ args = list(args) # optimizations for special cases if len(args) == 0: return SQL() if len(args) == 1: return args[0] if not self.__args: return SQL(self.__code.join("%s" for arg in args), *args) # general case: alternate args with self items = [self] * (len(args) * 2 - 1) for index, arg in enumerate(args): items[index * 2] = arg return SQL("%s" * len(items), *items) @classmethod def identifier(cls, name: str, subname: (str | None) = None) -> SQL: """ Return an SQL object that represents an identifier. """ assert IDENT_RE.match(name), f"{name!r} invalid for SQL.identifier()" if subname is None: return cls(f'"{name}"') assert IDENT_RE.match(subname), f"{subname!r} invalid for SQL.identifier()" return cls(f'"{name}"."{subname}"') def existing_tables(cr, tablenames): """ Return the names of existing tables among ``tablenames``. """ cr.execute(SQL(""" SELECT c.relname FROM pg_class c JOIN pg_namespace n ON (n.oid = c.relnamespace) WHERE c.relname IN %s AND c.relkind IN ('r', 'v', 'm') AND n.nspname = current_schema """, tuple(tablenames))) return [row[0] for row in cr.fetchall()] def table_exists(cr, tablename): """ Return whether the given table exists. """ return len(existing_tables(cr, {tablename})) == 1 class TableKind(enum.Enum): Regular = 'r' Temporary = 't' View = 'v' Materialized = 'm' Foreign = 'f' Other = None def table_kind(cr, tablename: str) -> Union[TableKind, None]: """ Return the kind of a table, if ``tablename`` is a regular or foreign table, or a view (ignores indexes, sequences, toast tables, and partitioned tables; unlogged tables are considered regular) """ cr.execute(SQL(""" SELECT c.relkind, c.relpersistence FROM pg_class c JOIN pg_namespace n ON (n.oid = c.relnamespace) WHERE c.relname = %s AND n.nspname = current_schema """, tablename)) if not cr.rowcount: return None kind, persistence = cr.fetchone() # special case: permanent, temporary, and unlogged tables differ by their # relpersistence, they're all "ordinary" (relkind = r) if kind == 'r': return TableKind.Temporary if persistence == 't' else TableKind.Regular try: return TableKind(kind) except ValueError: # NB: or raise? unclear if it makes sense to allow table_kind to # "work" with something like an index or sequence return TableKind.Other # prescribed column order by type: columns aligned on 4 bytes, columns aligned # on 1 byte, columns aligned on 8 bytes(values have been chosen to minimize # padding in rows; unknown column types are put last) SQL_ORDER_BY_TYPE = defaultdict(lambda: 16, { 'int4': 1, # 4 bytes aligned on 4 bytes 'varchar': 2, # variable aligned on 4 bytes 'date': 3, # 4 bytes aligned on 4 bytes 'jsonb': 4, # jsonb 'text': 5, # variable aligned on 4 bytes 'numeric': 6, # variable aligned on 4 bytes 'bool': 7, # 1 byte aligned on 1 byte 'timestamp': 8, # 8 bytes aligned on 8 bytes 'float8': 9, # 8 bytes aligned on 8 bytes }) def create_model_table(cr, tablename, comment=None, columns=()): """ Create the table for a model. """ colspecs = [ SQL('id SERIAL NOT NULL'), *(SQL("%s %s", SQL.identifier(colname), SQL(coltype)) for colname, coltype, _ in columns), SQL('PRIMARY KEY(id)'), ] queries = [ SQL("CREATE TABLE %s (%s)", SQL.identifier(tablename), SQL(", ").join(colspecs)), ] if comment: queries.append(SQL( "COMMENT ON TABLE %s IS %s", SQL.identifier(tablename), comment, )) for colname, _, colcomment in columns: queries.append(SQL( "COMMENT ON COLUMN %s IS %s", SQL.identifier(tablename, colname), colcomment, )) cr.execute(SQL("; ").join(queries)) _schema.debug("Table %r: created", tablename) def table_columns(cr, tablename): """ Return a dict mapping column names to their configuration. The latter is a dict with the data from the table ``information_schema.columns``. """ # Do not select the field `character_octet_length` from `information_schema.columns` # because specific access right restriction in the context of shared hosting (Heroku, OVH, ...) # might prevent a postgres user to read this field. cr.execute(SQL( ''' SELECT column_name, udt_name, character_maximum_length, is_nullable FROM information_schema.columns WHERE table_name=%s ''', tablename, )) return {row['column_name']: row for row in cr.dictfetchall()} def column_exists(cr, tablename, columnname): """ Return whether the given column exists. """ cr.execute(SQL( """ SELECT 1 FROM information_schema.columns WHERE table_name=%s AND column_name=%s """, tablename, columnname, )) return cr.rowcount def create_column(cr, tablename, columnname, columntype, comment=None): """ Create a column with the given type. """ sql = SQL( "ALTER TABLE %s ADD COLUMN %s %s %s", SQL.identifier(tablename), SQL.identifier(columnname), SQL(columntype), SQL("DEFAULT false" if columntype.upper() == 'BOOLEAN' else ""), ) if comment: sql = SQL("%s; %s", sql, SQL( "COMMENT ON COLUMN %s IS %s", SQL.identifier(tablename, columnname), comment, )) cr.execute(sql) _schema.debug("Table %r: added column %r of type %s", tablename, columnname, columntype) def rename_column(cr, tablename, columnname1, columnname2): """ Rename the given column. """ cr.execute(SQL( "ALTER TABLE %s RENAME COLUMN %s TO %s", SQL.identifier(tablename), SQL.identifier(columnname1), SQL.identifier(columnname2), )) _schema.debug("Table %r: renamed column %r to %r", tablename, columnname1, columnname2) def convert_column(cr, tablename, columnname, columntype): """ Convert the column to the given type. """ using = SQL("%s::%s", SQL.identifier(columnname), SQL(columntype)) _convert_column(cr, tablename, columnname, columntype, using) def convert_column_translatable(cr, tablename, columnname, columntype): """ Convert the column from/to a 'jsonb' translated field column. """ drop_index(cr, make_index_name(tablename, columnname), tablename) if columntype == "jsonb": using = SQL( "CASE WHEN %s IS NOT NULL THEN jsonb_build_object('en_US', %s::varchar) END", SQL.identifier(columnname), SQL.identifier(columnname), ) else: using = SQL("%s->>'en_US'", SQL.identifier(columnname)) _convert_column(cr, tablename, columnname, columntype, using) def _convert_column(cr, tablename, columnname, columntype, using: SQL): query = SQL( "ALTER TABLE %s ALTER COLUMN %s DROP DEFAULT, ALTER COLUMN %s TYPE %s USING %s", SQL.identifier(tablename), SQL.identifier(columnname), SQL.identifier(columnname), SQL(columntype), using, ) try: with cr.savepoint(flush=False): cr.execute(query, log_exceptions=False) except psycopg2.NotSupportedError: drop_depending_views(cr, tablename, columnname) cr.execute(query) _schema.debug("Table %r: column %r changed to type %s", tablename, columnname, columntype) def drop_depending_views(cr, table, column): """drop views depending on a field to allow the ORM to resize it in-place""" for v, k in get_depending_views(cr, table, column): cr.execute(SQL( "DROP %s IF EXISTS %s CASCADE", SQL("MATERIALIZED VIEW" if k == "m" else "VIEW"), SQL.identifier(v), )) _schema.debug("Drop view %r", v) def get_depending_views(cr, table, column): # http://stackoverflow.com/a/11773226/75349 cr.execute(SQL(""" SELECT distinct quote_ident(dependee.relname), dependee.relkind FROM pg_depend JOIN pg_rewrite ON pg_depend.objid = pg_rewrite.oid JOIN pg_class as dependee ON pg_rewrite.ev_class = dependee.oid JOIN pg_class as dependent ON pg_depend.refobjid = dependent.oid JOIN pg_attribute ON pg_depend.refobjid = pg_attribute.attrelid AND pg_depend.refobjsubid = pg_attribute.attnum WHERE dependent.relname = %s AND pg_attribute.attnum > 0 AND pg_attribute.attname = %s AND dependee.relkind in ('v', 'm') """, table, column)) return cr.fetchall() def set_not_null(cr, tablename, columnname): """ Add a NOT NULL constraint on the given column. """ query = SQL( "ALTER TABLE %s ALTER COLUMN %s SET NOT NULL", SQL.identifier(tablename), SQL.identifier(columnname), ) try: with cr.savepoint(flush=False): cr.execute(query, log_exceptions=False) _schema.debug("Table %r: column %r: added constraint NOT NULL", tablename, columnname) except Exception: raise Exception("Table %r: unable to set NOT NULL on column %r", tablename, columnname) def drop_not_null(cr, tablename, columnname): """ Drop the NOT NULL constraint on the given column. """ cr.execute(SQL( "ALTER TABLE %s ALTER COLUMN %s DROP NOT NULL", SQL.identifier(tablename), SQL.identifier(columnname), )) _schema.debug("Table %r: column %r: dropped constraint NOT NULL", tablename, columnname) def constraint_definition(cr, tablename, constraintname): """ Return the given constraint's definition. """ cr.execute(SQL(""" SELECT COALESCE(d.description, pg_get_constraintdef(c.oid)) FROM pg_constraint c JOIN pg_class t ON t.oid = c.conrelid LEFT JOIN pg_description d ON c.oid = d.objoid WHERE t.relname = %s AND conname = %s """, tablename, constraintname)) return cr.fetchone()[0] if cr.rowcount else None def add_constraint(cr, tablename, constraintname, definition): """ Add a constraint on the given table. """ if "%" in definition: definition = definition.replace("%", "%%") query1 = SQL( "ALTER TABLE %s ADD CONSTRAINT %s %s", SQL.identifier(tablename), SQL.identifier(constraintname), SQL(definition), ) query2 = SQL( "COMMENT ON CONSTRAINT %s ON %s IS %s", SQL.identifier(constraintname), SQL.identifier(tablename), definition, ) try: with cr.savepoint(flush=False): cr.execute(query1, log_exceptions=False) cr.execute(query2, log_exceptions=False) _schema.debug("Table %r: added constraint %r as %s", tablename, constraintname, definition) except Exception: raise Exception("Table %r: unable to add constraint %r as %s", tablename, constraintname, definition) def drop_constraint(cr, tablename, constraintname): """ drop the given constraint. """ try: with cr.savepoint(flush=False): cr.execute(SQL( "ALTER TABLE %s DROP CONSTRAINT %s", SQL.identifier(tablename), SQL.identifier(constraintname), )) _schema.debug("Table %r: dropped constraint %r", tablename, constraintname) except Exception: _schema.warning("Table %r: unable to drop constraint %r!", tablename, constraintname) def add_foreign_key(cr, tablename1, columnname1, tablename2, columnname2, ondelete): """ Create the given foreign key, and return ``True``. """ cr.execute(SQL( "ALTER TABLE %s ADD FOREIGN KEY (%s) REFERENCES %s(%s) ON DELETE %s", SQL.identifier(tablename1), SQL.identifier(columnname1), SQL.identifier(tablename2), SQL.identifier(columnname2), SQL(ondelete), )) _schema.debug("Table %r: added foreign key %r references %r(%r) ON DELETE %s", tablename1, columnname1, tablename2, columnname2, ondelete) return True def get_foreign_keys(cr, tablename1, columnname1, tablename2, columnname2, ondelete): deltype = _CONFDELTYPES[ondelete.upper()] cr.execute(SQL( """ SELECT fk.conname as name FROM pg_constraint AS fk JOIN pg_class AS c1 ON fk.conrelid = c1.oid JOIN pg_class AS c2 ON fk.confrelid = c2.oid JOIN pg_attribute AS a1 ON a1.attrelid = c1.oid AND fk.conkey[1] = a1.attnum JOIN pg_attribute AS a2 ON a2.attrelid = c2.oid AND fk.confkey[1] = a2.attnum WHERE fk.contype = 'f' AND c1.relname = %s AND a1.attname = %s AND c2.relname = %s AND a2.attname = %s AND fk.confdeltype = %s """, tablename1, columnname1, tablename2, columnname2, deltype, )) return [r[0] for r in cr.fetchall()] def fix_foreign_key(cr, tablename1, columnname1, tablename2, columnname2, ondelete): """ Update the foreign keys between tables to match the given one, and return ``True`` if the given foreign key has been recreated. """ # Do not use 'information_schema' here, as those views are awfully slow! deltype = _CONFDELTYPES.get(ondelete.upper(), 'a') cr.execute(SQL( """ SELECT con.conname, c2.relname, a2.attname, con.confdeltype as deltype FROM pg_constraint as con, pg_class as c1, pg_class as c2, pg_attribute as a1, pg_attribute as a2 WHERE con.contype='f' AND con.conrelid=c1.oid AND con.confrelid=c2.oid AND array_lower(con.conkey, 1)=1 AND con.conkey[1]=a1.attnum AND array_lower(con.confkey, 1)=1 AND con.confkey[1]=a2.attnum AND a1.attrelid=c1.oid AND a2.attrelid=c2.oid AND c1.relname=%s AND a1.attname=%s """, tablename1, columnname1, )) found = False for fk in cr.fetchall(): if not found and fk[1:] == (tablename2, columnname2, deltype): found = True else: drop_constraint(cr, tablename1, fk[0]) if not found: return add_foreign_key(cr, tablename1, columnname1, tablename2, columnname2, ondelete) def index_exists(cr, indexname): """ Return whether the given index exists. """ cr.execute(SQL("SELECT 1 FROM pg_indexes WHERE indexname=%s", indexname)) return cr.rowcount def check_index_exist(cr, indexname): assert index_exists(cr, indexname), f"{indexname} does not exist" def create_index(cr, indexname, tablename, expressions, method='btree', where=''): """ Create the given index unless it exists. """ if index_exists(cr, indexname): return cr.execute(SQL( "CREATE INDEX %s ON %s USING %s (%s)%s", SQL.identifier(indexname), SQL.identifier(tablename), SQL(method), SQL(", ").join(SQL(expression) for expression in expressions), SQL(" WHERE %s", SQL(where)) if where else SQL(), )) _schema.debug("Table %r: created index %r (%s)", tablename, indexname, ", ".join(expressions)) def create_unique_index(cr, indexname, tablename, expressions): """ Create the given index unless it exists. """ if index_exists(cr, indexname): return cr.execute(SQL( "CREATE UNIQUE INDEX %s ON %s (%s)", SQL.identifier(indexname), SQL.identifier(tablename), SQL(", ").join(SQL(expression) for expression in expressions), )) _schema.debug("Table %r: created index %r (%s)", tablename, indexname, ", ".join(expressions)) def drop_index(cr, indexname, tablename): """ Drop the given index if it exists. """ cr.execute(SQL("DROP INDEX IF EXISTS %s", SQL.identifier(indexname))) _schema.debug("Table %r: dropped index %r", tablename, indexname) def drop_view_if_exists(cr, viewname): kind = table_kind(cr, viewname) if kind == TableKind.View: cr.execute(SQL("DROP VIEW %s CASCADE", SQL.identifier(viewname))) elif kind == TableKind.Materialized: cr.execute(SQL("DROP MATERIALIZED VIEW %s CASCADE", SQL.identifier(viewname))) def escape_psql(to_escape): return to_escape.replace('\\', r'\\').replace('%', '\%').replace('_', '\_') def pg_varchar(size=0): """ Returns the VARCHAR declaration for the provided size: * If no size (or an empty or negative size is provided) return an 'infinite' VARCHAR * Otherwise return a VARCHAR(n) :param int size: varchar size, optional :rtype: str """ if size: if not isinstance(size, int): raise ValueError("VARCHAR parameter should be an int, got %s" % type(size)) if size > 0: return 'VARCHAR(%d)' % size return 'VARCHAR' def reverse_order(order): """ Reverse an ORDER BY clause """ items = [] for item in order.split(','): item = item.lower().split() direction = 'asc' if item[1:] == ['desc'] else 'desc' items.append('%s %s' % (item[0], direction)) return ', '.join(items) def increment_fields_skiplock(records, *fields): """ Increment 'friendly' the given `fields` of the current `records`. If record is locked, we just skip the update. It doesn't invalidate the cache since the update is not critical. :param records: recordset to update :param fields: integer fields to increment :returns: whether the specified fields were incremented on any record. :rtype: bool """ if not records: return False for field in fields: assert records._fields[field].type == 'integer' cr = records._cr tablename = records._table cr.execute(SQL( """ UPDATE %s SET %s WHERE id IN (SELECT id FROM %s WHERE id = ANY(%s) FOR UPDATE SKIP LOCKED) """, SQL.identifier(tablename), SQL(', ').join( SQL("%s = COALESCE(%s, 0) + 1", SQL.identifier(field), SQL.identifier(field)) for field in fields ), SQL.identifier(tablename), records.ids, )) return bool(cr.rowcount) def value_to_translated_trigram_pattern(value): """ Escape value to match a translated field's trigram index content The trigram index function jsonb_path_query_array("column_name", '$.*')::text uses all translations' representations to build the indexed text. So the original text needs to be JSON-escaped correctly to match it. :param str value: value provided in domain :return: a pattern to match the indexed text """ if len(value) < 3: # matching less than 3 characters will not take advantage of the index return '%' # apply JSON escaping to value; the argument ensure_ascii=False prevents # json.dumps from escaping unicode to ascii, which is consistent with the # index function jsonb_path_query_array("column_name", '$.*')::text json_escaped = json.dumps(value, ensure_ascii=False)[1:-1] # apply PG wildcard escaping to JSON-escaped text wildcard_escaped = re.sub(r'(_|%|\\)', r'\\\1', json_escaped) # add wildcards around it to get the pattern return f"%{wildcard_escaped}%" def pattern_to_translated_trigram_pattern(pattern): """ Escape pattern to match a translated field's trigram index content The trigram index function jsonb_path_query_array("column_name", '$.*')::text uses all translations' representations to build the indexed text. So the original pattern needs to be JSON-escaped correctly to match it. :param str pattern: value provided in domain :return: a pattern to match the indexed text """ # find the parts around (non-escaped) wildcard characters (_, %) sub_patterns = re.findall(r''' ( (?:.)*? # 0 or more charaters including the newline character (?= 3] # apply PG wildcard escaping to JSON-escaped texts (% becomes \%) wildcard_escaped = [re.sub(r'(_|%|\\)', r'\\\1', t) for t in json_escaped] # replace the original wildcard characters by % return f"%{'%'.join(wildcard_escaped)}%" if wildcard_escaped else "%" def make_identifier(identifier: str) -> str: """ Return ``identifier``, possibly modified to fit PostgreSQL's identifier size limitation. If too long, ``identifier`` is truncated and padded with a hash to make it mostly unique. """ # if length exceeds the PostgreSQL limit of 63 characters. if len(identifier) > 63: # We have to fit a crc32 hash and one underscore into a 63 character # alias. The remaining space we can use to add a human readable prefix. return f"{identifier[:54]}_{crc32(identifier.encode()):08x}" return identifier def make_index_name(table_name: str, column_name: str) -> str: """ Return an index name according to conventions for the given table and column. """ return make_identifier(f"{table_name}__{column_name}_index")