/* $Id: bf083ea0858d596648d312d4145bb8a608bf88c9 $ Part of SWI-Prolog Author: Mike Elston Matt Lilley E-mail: matt.s.lilley@gmail.com WWW: http://www.swi-prolog.org Copyright (C): 1985-2014, Mike Elston, Matt Lilley This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA As a special exception, if you link this library with other files, compiled with a Free Software compiler, to produce an executable, this library does not by itself cause the resulting executable to be covered by the GNU General Public License. This exception does not however invalidate any other reasons why the executable file might be covered by the GNU General Public License. PostgreSQL is a trademark of the PostgreSQL Global Development Group. Microsoft, SQL Server, and Windows are either registered trademarks or trademarks of Microsoft Corporation in the United States and/or other countries. SQLite is a registered trademark of Hipp, Wyrick & Company, Inc in the United States. All other trademarks or registered trademarks are the property of their respective owners. */ :-module(cql_database, [get_transaction_context/5, odbc_execute_with_statement_cache/7, save_database_event/6, application_value_to_odbc_value/7, odbc_value_to_application_value/5, cql_transaction/3, database_transaction_query_info/3, current_transaction_id/1, transaction_active/0, register_database_connection_details/2, resolve_deadlock/1, database_connection_details/2, odbc_connection_call/3, update_history/14, odbc_cleanup_and_disconnect/1]). :-use_module(library(cql/cql)). :-dynamic database_connection_details/2. :-volatile database_connection_details/2. :-thread_local database_event/6, transaction_active/0, transaction_context/4, database_transaction_query_info/3. get_transaction_context(TransactionId, TrxId, AccessToken, TransactionTimestamp, Connection) :- ( transaction_context(TransactionId_, AccessToken_, TransactionTimestamp_, Connection_) -> TransactionId = TransactionId_, TrxId = {null}, AccessToken = AccessToken_, TransactionTimestamp = TransactionTimestamp_, Connection = Connection_ ; otherwise -> throw(no_database_transaction_active) ). :-meta_predicate odbc_connection_call(+, -, 0). :-thread_local % odbc_connection_available(Schema, Connection) odbc_connection_available/2, % odbc_connection_in_use(Schema) odbc_connection_in_use/1. :-multifile(cql_max_db_connections_hook/1). :-multifile(cql:odbc_connection_complete_hook/3). odbc_connection_call(Schema, Connection, Goal) :- ( retract(odbc_connection_available(Schema, Connection)) -> % Get a connection from the pool assert(odbc_connection_in_use(Schema)), setup_call_cleanup(true, Goal, ( odbc_end_transaction(Connection, rollback), % Ensure connections in the pool have no pending results dangling retract(odbc_connection_in_use(Schema)), assert(odbc_connection_available(Schema, Connection)))) % Put connection back in the pool ; aggregate_all(r(count), odbc_connection_in_use(Schema), r(N)), ( cql_max_db_connections_hook(MaxDbConnections)-> true ; otherwise-> MaxDbConnections = 10 ), N >= MaxDbConnections -> thread_self(ThreadId), cql_error(too_many_schema_connections, 'Too many connections on ~w: Maximum is ~w', [ThreadId, MaxDbConnections]) ; database_connection_details(Schema, ConnectionDetails) -> ( ConnectionDetails = driver_string(DriverString) -> true ; ConnectionDetails = dsn(Dsn, Username, Password) -> gethostname(HostName), format(atom(DriverString), 'DSN=~w;UID=~w;PWD=~w;WSID=~w;', [Dsn, Username, Password, HostName]) ; ConnectionDetails = dsn(Dsn) -> gethostname(HostName), format(atom(DriverString), 'DSN=~w;WSID=~w;', [Dsn, HostName]) ; otherwise -> throw(invalid_connection_details(ConnectionDetails)) ), odbc_connect(-, Connection, [driver_string(DriverString), silent(true), null({null}), auto_commit(false), wide_column_threshold(8000), mars(true)]), % In theory this is not needed following bug-5181 but see comment in predicate description thread_at_exit(odbc_cleanup_and_disconnect(Connection)), assert(odbc_connection_available(Schema, Connection)), ignore(cql:odbc_connection_complete_hook(Schema, ConnectionDetails, Connection)), odbc_connection_call(Schema, Connection, Goal) ; otherwise -> throw(no_database_connection_details) ). %% odbc_cleanup_and_disconnect(+Connection) is det. % % Rollback the current transaction, retract and free prepared statements, then disconnect. % % To avoid leaks, all exiting threads with database connections should call this. See odbc_connection_call/2 (thread_at_exit/1) % % Note that any exception inside odbc_cleanup_and_disconnect/1 will result in it not going on to the next step. % % We log exceptions to the event log because exceptions at this level are associated with the server process crashing % and the SE log is unlikely to capture anything useful. odbc_cleanup_and_disconnect(Connection) :- catch_all(odbc_cleanup_and_disconnect_1(Connection), E, ( thread_self(ThreadId), cql_log([], error, '[~w] odbc_cleanup_and_disconnect/1 : ~w', [ThreadId, E]))). odbc_cleanup_and_disconnect_1(Connection) :- thread_self(ThreadId), debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]), odbc_end_transaction(Connection, rollback), debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]), forall(retract(cached_prepared_odbc_statement(_, _, Connection, _, _, CachedStatement, _)), ( debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]), odbc_free_statement(CachedStatement), debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]) ) ), retractall(lru_key(_)), retractall(lru_statement(_)), debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]), odbc_disconnect(Connection), debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]), % Get rid of these last so there is some evidence if odbc_disconnect/1 does not work retractall(sql_server_spid(Connection, _, _, _)). %! odbc_execute_with_statement_cache(+Connection, +FileName, +LineNumber, +Sql, +OdbcParameters, +OdbcParameterDataTypes, -Row) :-thread_local % cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId) % Note that we need OdbcParameterDataTypes in here so we can look up the correct statement. Consider: % "SELECT * FROM some_table WHERE some_column = ?" % This can be compiled with varchar(4) as the datatype, to get statement S, % If we then want to do a query where user_id = 'ERIC' we are going to get a runtime type error. % Ordinarily this isn't a problem because the domain is well-established at compile-time, but this % is not the case when dealing with dynamic tables, specifically #cql_in. cached_prepared_odbc_statement/7. :-thread_local lru_statement/1, statement_locked/1, lru_key/1. max_lru_size(4000). % This is called with the current statement locked evict_cache_entries(_, 0):- !. evict_cache_entries(Key, N):- N > 0, retract(lru_statement(MutexId)), % This statement cannot be locked unless the cache size is extremely small, since we JUST cycled the current statement to the bottom of the stack ( statement_locked(MutexId)-> % Just do nothing in this case. We will get it next time. Besides, it is very unlikely to happen true ; otherwise-> thread_self(ThreadId), retract(cached_prepared_odbc_statement(Sql, _, _, _, _, Statement, MutexId)), odbc_free_statement(Statement), debug(odbc_statement_cache, 'CACHE-EVICT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]), flag(Key, X, X-1) ), NN is N-1, !, evict_cache_entries(Key, NN). odbc_execute_with_statement_cache(Connection, _, _, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :- cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, _, _, Statement, MutexId), !, setup_call_cleanup(assert(statement_locked(MutexId)), ( thread_self(ThreadId), retract(lru_statement(MutexId)), assertz(lru_statement(MutexId)), debug(odbc_statement_cache, 'CACHE-HIT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]), odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row) ), retract(statement_locked(MutexId))). odbc_execute_with_statement_cache(Connection, FileName, LineNumber, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :- thread_self(ThreadId), debug(odbc_statement_cache, 'CACHE-MISS [~w] : ~@', [ThreadId, trimmed_sql(Sql, 80)]), odbc_prepare(Connection, Sql, OdbcParameterDataTypes, Statement, []), gensym(statement_lock_, MutexId), ( lru_key(Key)-> true ; otherwise-> gensym(lru_key_, Key), assert(lru_key(Key)) ), setup_call_cleanup(assert(statement_locked(MutexId)), ( assertz(cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId)), assertz(lru_statement(MutexId)), max_lru_size(MaxSize), flag(Key, CacheSize, CacheSize+1), ( CacheSize >= MaxSize-> Delta is CacheSize - MaxSize, evict_cache_entries(Key, Delta) ; otherwise-> true ), flag(Key, Z, Z), debug(odbc_statement_cache, 'CACHE-STORE [~w] ~w, ~w : ~@', [ThreadId, Statement, MutexId, trimmed_sql(Sql, 60)]), odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row) ), retract(statement_locked(MutexId))). %% save_database_event(+AccessToken, %% +EventType, %% +Schema, %% +TableName, %% +PrimaryKeyColumnName, %% +PrimaryKey). % % Need this because its called from the caller's module and we want the fact asserted % in this module save_database_event(AccessToken, % + EventType, % + Schema, % + TableName, % + PrimaryKeyColumnName, % + PrimaryKey) :- % + ( database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey) -> % No point storing an event more than once true ; otherwise-> assert(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)) ). :-meta_predicate(cql_transaction(+, +, 0)). cql_transaction(Schema, AccessToken, Goal):- thread_self(ThreadId), setup_call_cleanup(assert(transaction_active), cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet), ( retractall(database_transaction_query_info(ThreadId, _, _)), retractall(transaction_context(_, _, _, _)), retractall(database_event(_, _, _, _, _, _)), flag(transaction_count, Count, Count+1), retractall(transaction_active))), % Removed last so if transaction_active succeeds while executing Goal then the other facts are still available to Goal cql_process_database_events(DatabaseEventsSet). cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet):- ( transaction_context(ExistingTransactionId, _, _, _) -> throw(database_transaction_already_in_progress(ExistingTransactionId)) ; otherwise -> true ), resolve_deadlock(cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet)). cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet) :- odbc_connection_call(Schema, Connection, ( ( dbms(Schema, 'Microsoft SQL Server')-> odbc_query(Connection, 'SELECT CONVERT(VARCHAR(36), NEWID())', row(TransactionId)) ; dbms(Schema, 'PostgreSQL') -> odbc_query(Connection, 'SELECT uuid_generate_v1()', row(TransactionId)) ; dbms(Schema, 'SQLite') -> odbc_query(Connection, 'SELECT substr(u,1,8)||\'-\'||substr(u,9,4)||\'-4\'||substr(u,13,3)||\'-\'||v||substr(u,17,3)||\'-\'||substr(u,21,12) from (select lower(hex(randomblob(16))) as u, substr(\'89ab\',abs(random()) % 4 + 1, 1) as v)', row(TransactionId)) ; otherwise -> throw(no_dbms_for_schema(Schema)) ), dbms(Schema, DBMS), store_transaction_info(AccessToken, Connection, DBMS, Goal), get_time(ExecutionTime), assert(transaction_context(TransactionId, AccessToken, ExecutionTime, Connection)), ( cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) -> true ; otherwise -> % odbc_connection_call/3 always rolls back so no need for explicit rollback here log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_logic_failure), fail ))). :-meta_predicate cql_transaction_3(0, +, +, +, -). cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) :- log_transaction_state(AccessToken, TransactionId, transaction_starting), catch(Goal, E, Error = E), % Note that this previously did a setof/3. This reorders events, which breaks event consolidation findall(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey), retract(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)), DatabaseEvents), % list_to_set/2 is NlogN and preserves order list_to_set(DatabaseEvents, DatabaseEventsSet), ( var(Error) -> odbc_end_transaction(Connection, commit), log_transaction_state(AccessToken, TransactionId, transaction_committed) ; otherwise -> % odbc_connection_call/3 always rolls back so no need for explicit rollback here log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_error), throw(Error) ). %% resolve_deadlock(:Goal) % % Call Goal as in catch/3. If a deadlock ('40001') error occurs then Goal is *|called again|* immediately if another transaction has completed in the % time since Goal was called, since that transaction may well have been the reason for the deadlock. % If no other transaction has completed Goal is *|called again|* after a random delay of 0.0 to 2.0 seconds. The maximum number of retries % is specified by maximum_deadlock_retries/1. It is important to note that the deadlock mechanism actually *|retries|* Goal, i.e. it calls it % *|again|*. % % *|Use this only when you are sure Goal has no non-database side effects (assert/retract, file operations etc)|* % % Originally developed for use inside cql_transaction/3, resolve_deadlock/1 can also be used to ensure non-transactional % operations can resolve deadlocks. :-meta_predicate resolve_deadlock(0). resolve_deadlock(Goal) :- thread_self(ThreadId), flag(transaction_count, InitialCount, InitialCount), maximum_deadlock_retries(MaximumDeadlockRetries), between(1, MaximumDeadlockRetries, RetryCount), % BTP for deadlock retry ( RetryCount >= MaximumDeadlockRetries -> cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_FAILED\tCOULD NOT RESOLVE deadlock on thread \'~w\'. Goal: ~w', [ThreadId, Goal]), throw(deadlock_retry_count_exceeded(MaximumDeadlockRetries)) ; RetryCount > 1 -> % Check if another transaction has completed. Note complete means committed -or- rolled back flag(transaction_count, CurrentCount, CurrentCount), ( CurrentCount =:= InitialCount -> Flag = no_other_transaction_completed ; otherwise -> Flag = another_transaction_completed ) ; otherwise -> Flag = no_deadlock ), ( Flag == no_other_transaction_completed -> Delay is ( 2 << RetryCount) / 1000.0, % Exponential backoff up to 2.048s sleep(Delay), cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\'(attempt ~w). Initiated by EXPIRY of RANDOM WAIT of ~w seconds.', [ThreadId, RetryCount, Delay]) ; Flag == another_transaction_completed -> cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\' (attempt ~w). Initiated by COMPLETION of a TRANSACTION on another thread.', [ThreadId, RetryCount]) ; otherwise -> true ), catch_all((Goal -> LogicalStatus = 1 ; otherwise -> true ), error(odbc('40001', _, _), _), ( cql_log([debug(deadlocks)], warning, 'DEADLOCK_DETECTED\tThread \'~w\' selected as DEADLOCK VICTIM. Goal: ~w', [ThreadId, Goal]), retractall(database_transaction_query_info(ThreadId, _, _)), retractall(transaction_context(_, _, _, _)), retractall(database_event(_, _, _, _, _, _)), fail)), ( RetryCount > 1 -> cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLVED\tdeadlocked transaction on thread \'~w\' RESOLVED (attempt ~w).', [ThreadId, RetryCount]) ; otherwise -> true ), !, % Don't want to backtrack into the deadlock retry between/3 when Goal fails LogicalStatus == 1. %% maximum_deadlock_retries(?MaximumDeadlockRetries:integer). % % The maximum number of time to retry a deadlocked Goal maximum_deadlock_retries(10). % log_transaction_state(+AccessToken, +TransactionId, +TransactionState) log_transaction_state(AccessToken, TransactionId, TransactionState) :- cql_access_token_to_user_id(AccessToken, UserId), upcase_atom(TransactionState, TransactionStateUc), cql_log([], informational, '\t~p\t~p\t~p', [UserId, TransactionId, TransactionStateUc]). %% register_database_connection_details(+Schema:atom, +ConnectionDetails) is det. % % This should be called once to register the database connection details. % % @param ConnectionDetails driver_string(DriverString) or dsn(Dsn, Username, Password) register_database_connection_details(Schema, ConnectionDetails) :- assert(database_connection_details(Schema, ConnectionDetails)). update_history(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal):- ignore(cql_update_history_hook(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal)). %% application_value_to_odbc_value(+ApplicationValue, +OdbcDataType, +Schema, +TableName, +ColumnName, +Qualifiers, -OdbcValue). :-multifile(cql:application_value_to_odbc_value_hook/7). application_value_to_odbc_value(ApplicationValue, OdbcDataType, Schema, TableName, ColumnName, Qualifiers, OdbcValue):- ( var(ApplicationValue)-> throw(instantiation_error(ApplicationValue)) ; cql:application_value_to_odbc_value_hook(OdbcDataType, Schema, TableName, ColumnName, Qualifiers, ApplicationValue, OdbcValue)-> true ; otherwise-> OdbcValue = ApplicationValue ). odbc_numeric_precision_limit(27). %% odbc_value_to_application_value(+Schema, +TableSpec, +ColumnName, +OdbcValue, ?ApplicationValue). :-multifile(cql:odbc_value_to_application_value_hook/7). odbc_value_to_application_value(Schema, TableSpec, ColumnName, OdbcValue, ApplicationValue):- cql_data_type(Schema, TableSpec, ColumnName, DatabaseDataType, _, _, _, Domain, _, _), !, ( cql:odbc_value_to_application_value_hook(DatabaseDataType, Schema, TableSpec, ColumnName, Domain, OdbcValue, ApplicationValue)-> true ; otherwise-> ApplicationValue = OdbcValue ). % FIXME: What to do about this? catch_all(A, B, C):- catch(A, B, C). :-multifile(cql:process_database_events/1). cql_process_database_events(Events):- ignore(cql:process_database_events(Events)). :-multifile(cql:cql_transaction_info_hook/5). store_transaction_info(AccessToken, Connection, DBMS, Goal):- ( cql:cql_transaction_info_hook(AccessToken, Connection, DBMS, Goal, Info)-> true ; otherwise-> Info = {null} ), thread_self(ThreadId), assert(database_transaction_query_info(ThreadId, Goal, Info)). %% current_transaction_id(-TransactionId). current_transaction_id(TransactionId):- transaction_context(TransactionId, _, _, _).