golden hour
/opt/alt/python37/lib/python3.7/site-packages/alembic/ddl
⬆️ Go Up
Upload
File/Folder
Size
Actions
__init__.py
125 B
Del
OK
__pycache__
-
Del
OK
base.py
5.79 KB
Del
OK
impl.py
11.8 KB
Del
OK
mssql.py
7.99 KB
Del
OK
mysql.py
11.41 KB
Del
OK
oracle.py
2.62 KB
Del
OK
postgresql.py
5.2 KB
Del
OK
sqlite.py
3.44 KB
Del
OK
Edit: impl.py
from sqlalchemy import schema, text from sqlalchemy import types as sqltypes from ..util.compat import ( string_types, text_type, with_metaclass ) from ..util import sqla_compat from .. import util from . import base class ImplMeta(type): def __init__(cls, classname, bases, dict_): newtype = type.__init__(cls, classname, bases, dict_) if '__dialect__' in dict_: _impls[dict_['__dialect__']] = cls return newtype _impls = {} class DefaultImpl(with_metaclass(ImplMeta)): """Provide the entrypoint for major migration operations, including database-specific behavioral variances. While individual SQL/DDL constructs already provide for database-specific implementations, variances here allow for entirely different sequences of operations to take place for a particular migration, such as SQL Server's special 'IDENTITY INSERT' step for bulk inserts. """ __dialect__ = 'default' transactional_ddl = False command_terminator = ";" def __init__(self, dialect, connection, as_sql, transactional_ddl, output_buffer, context_opts): self.dialect = dialect self.connection = connection self.as_sql = as_sql self.literal_binds = context_opts.get('literal_binds', False) if self.literal_binds and not util.sqla_08: util.warn("'literal_binds' flag not supported in SQLAlchemy 0.7") self.literal_binds = False self.output_buffer = output_buffer self.memo = {} self.context_opts = context_opts if transactional_ddl is not None: self.transactional_ddl = transactional_ddl if self.literal_binds: if not self.as_sql: raise util.CommandError( "Can't use literal_binds setting without as_sql mode") @classmethod def get_by_dialect(cls, dialect): return _impls[dialect.name] def static_output(self, text): self.output_buffer.write(text_type(text + "\n\n")) self.output_buffer.flush() def requires_recreate_in_batch(self, batch_op): """Return True if the given :class:`.BatchOperationsImpl` would need the table to be recreated and copied in order to proceed. Normally, only returns True on SQLite when operations other than add_column are present. """ return False def prep_table_for_batch(self, table): """perform any operations needed on a table before a new one is created to replace it in batch mode. the PG dialect uses this to drop constraints on the table before the new one uses those same names. """ @property def bind(self): return self.connection def _exec(self, construct, execution_options=None, multiparams=(), params=util.immutabledict()): if isinstance(construct, string_types): construct = text(construct) if self.as_sql: if multiparams or params: # TODO: coverage raise Exception("Execution arguments not allowed with as_sql") if self.literal_binds and not isinstance( construct, schema.DDLElement): compile_kw = dict(compile_kwargs={"literal_binds": True}) else: compile_kw = {} self.static_output(text_type( construct.compile(dialect=self.dialect, **compile_kw) ).replace("\t", " ").strip() + self.command_terminator) else: conn = self.connection if execution_options: conn = conn.execution_options(**execution_options) return conn.execute(construct, *multiparams, **params) def execute(self, sql, execution_options=None): self._exec(sql, execution_options) def alter_column(self, table_name, column_name, nullable=None, server_default=False, name=None, type_=None, schema=None, autoincrement=None, existing_type=None, existing_server_default=None, existing_nullable=None, existing_autoincrement=None ): if autoincrement is not None or existing_autoincrement is not None: util.warn( "autoincrement and existing_autoincrement " "only make sense for MySQL") if nullable is not None: self._exec(base.ColumnNullable( table_name, column_name, nullable, schema=schema, existing_type=existing_type, existing_server_default=existing_server_default, existing_nullable=existing_nullable, )) if server_default is not False: self._exec(base.ColumnDefault( table_name, column_name, server_default, schema=schema, existing_type=existing_type, existing_server_default=existing_server_default, existing_nullable=existing_nullable, )) if type_ is not None: self._exec(base.ColumnType( table_name, column_name, type_, schema=schema, existing_type=existing_type, existing_server_default=existing_server_default, existing_nullable=existing_nullable, )) # do the new name last ;) if name is not None: self._exec(base.ColumnName( table_name, column_name, name, schema=schema, existing_type=existing_type, existing_server_default=existing_server_default, existing_nullable=existing_nullable, )) def add_column(self, table_name, column, schema=None): self._exec(base.AddColumn(table_name, column, schema=schema)) def drop_column(self, table_name, column, schema=None, **kw): self._exec(base.DropColumn(table_name, column, schema=schema)) def add_constraint(self, const): if const._create_rule is None or \ const._create_rule(self): self._exec(schema.AddConstraint(const)) def drop_constraint(self, const): self._exec(schema.DropConstraint(const)) def rename_table(self, old_table_name, new_table_name, schema=None): self._exec(base.RenameTable(old_table_name, new_table_name, schema=schema)) def create_table(self, table): if util.sqla_07: table.dispatch.before_create(table, self.connection, checkfirst=False, _ddl_runner=self) self._exec(schema.CreateTable(table)) if util.sqla_07: table.dispatch.after_create(table, self.connection, checkfirst=False, _ddl_runner=self) for index in table.indexes: self._exec(schema.CreateIndex(index)) def drop_table(self, table): self._exec(schema.DropTable(table)) def create_index(self, index): self._exec(schema.CreateIndex(index)) def drop_index(self, index): self._exec(schema.DropIndex(index)) def bulk_insert(self, table, rows, multiinsert=True): if not isinstance(rows, list): raise TypeError("List expected") elif rows and not isinstance(rows[0], dict): raise TypeError("List of dictionaries expected") if self.as_sql: for row in rows: self._exec(table.insert(inline=True).values(**dict( (k, sqla_compat._literal_bindparam( k, v, type_=table.c[k].type) if not isinstance( v, sqla_compat._literal_bindparam) else v) for k, v in row.items() ))) else: # work around http://www.sqlalchemy.org/trac/ticket/2461 if not hasattr(table, '_autoincrement_column'): table._autoincrement_column = None if rows: if multiinsert: self._exec(table.insert(inline=True), multiparams=rows) else: for row in rows: self._exec(table.insert(inline=True).values(**row)) def compare_type(self, inspector_column, metadata_column): conn_type = inspector_column.type metadata_type = metadata_column.type metadata_impl = metadata_type.dialect_impl(self.dialect) # work around SQLAlchemy bug "stale value for type affinity" # fixed in 0.7.4 metadata_impl.__dict__.pop('_type_affinity', None) if hasattr(metadata_impl, "compare_against_backend"): comparison = metadata_impl.compare_against_backend( self.dialect, conn_type) if comparison is not None: return not comparison if conn_type._compare_type_affinity( metadata_impl ): comparator = _type_comparators.get(conn_type._type_affinity, None) return comparator and comparator(metadata_type, conn_type) else: return True def compare_server_default(self, inspector_column, metadata_column, rendered_metadata_default, rendered_inspector_default): return rendered_inspector_default != rendered_metadata_default def correct_for_autogen_constraints(self, conn_uniques, conn_indexes, metadata_unique_constraints, metadata_indexes): pass def _compat_autogen_column_reflect(self, inspector): if util.sqla_08: return self.autogen_column_reflect else: def adapt(table, column_info): return self.autogen_column_reflect( inspector, table, column_info) return adapt def correct_for_autogen_foreignkeys(self, conn_fks, metadata_fks): pass def autogen_column_reflect(self, inspector, table, column_info): """A hook that is attached to the 'column_reflect' event for when a Table is reflected from the database during the autogenerate process. Dialects can elect to modify the information gathered here. """ def start_migrations(self): """A hook called when :meth:`.EnvironmentContext.run_migrations` is called. Implementations can set up per-migration-run state here. """ def emit_begin(self): """Emit the string ``BEGIN``, or the backend-specific equivalent, on the current connection context. This is used in offline mode and typically via :meth:`.EnvironmentContext.begin_transaction`. """ self.static_output("BEGIN" + self.command_terminator) def emit_commit(self): """Emit the string ``COMMIT``, or the backend-specific equivalent, on the current connection context. This is used in offline mode and typically via :meth:`.EnvironmentContext.begin_transaction`. """ self.static_output("COMMIT" + self.command_terminator) def _string_compare(t1, t2): return \ t1.length is not None and \ t1.length != t2.length def _numeric_compare(t1, t2): return \ ( t1.precision is not None and t1.precision != t2.precision ) or \ ( t1.scale is not None and t1.scale != t2.scale ) _type_comparators = { sqltypes.String: _string_compare, sqltypes.Numeric: _numeric_compare }
Save