rdbms: Allow PostgreSQL schema-check functions to find temporary tables
authorBrad Jorsch <bjorsch@wikimedia.org>
Sun, 18 Mar 2018 03:13:43 +0000 (23:13 -0400)
committerAaron Schulz <aschulz@wikimedia.org>
Thu, 5 Apr 2018 22:54:01 +0000 (22:54 +0000)
PostgreSQL puts temporary tables and such in a hidden, per-connection
"schema" that's checked for unqualified table accesses before the normal
search_path. We should check that in all the schema-checking functions.

Change-Id: I1194ac31f31133b177f624138afee19d00e454b9

includes/libs/rdbms/database/DatabasePostgres.php
includes/libs/rdbms/field/PostgresField.php

index 9c24787..32ea375 100644 (file)
@@ -43,6 +43,8 @@ class DatabasePostgres extends Database {
        private $connectString;
        /** @var string */
        private $coreSchema;
+       /** @var string */
+       private $tempSchema;
        /** @var string[] Map of (reserved table name => alternate table name) */
        private $keywordTableMap = [];
 
@@ -73,15 +75,17 @@ class DatabasePostgres extends Database {
        }
 
        public function hasConstraint( $name ) {
-               $conn = $this->getBindingHandle();
-
-               $sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " .
-                       "WHERE c.connamespace = n.oid AND conname = '" .
-                       pg_escape_string( $conn, $name ) . "' AND n.nspname = '" .
-                       pg_escape_string( $conn, $this->getCoreSchema() ) . "'";
-               $res = $this->doQuery( $sql );
-
-               return $this->numRows( $res );
+               foreach ( $this->getCoreSchemas() as $schema ) {
+                       $sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " .
+                               "WHERE c.connamespace = n.oid AND conname = " .
+                               $this->addQuotes( $name ) . " AND n.nspname = " .
+                               $this->addQuotes( $schema );
+                       $res = $this->doQuery( $sql );
+                       if ( $res && $this->numRows( $res ) ) {
+                               return true;
+                       }
+               }
+               return false;
        }
 
        public function open( $server, $user, $password, $dbName ) {
@@ -428,59 +432,65 @@ class DatabasePostgres extends Database {
 
        public function indexAttributes( $index, $schema = false ) {
                if ( $schema === false ) {
-                       $schema = $this->getCoreSchema();
-               }
-               /*
-                * A subquery would be not needed if we didn't care about the order
-                * of attributes, but we do
-                */
-               $sql = <<<__INDEXATTR__
-
-                       SELECT opcname,
-                               attname,
-                               i.indoption[s.g] as option,
-                               pg_am.amname
-                       FROM
-                               (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
-                                       FROM
-                                               pg_index isub
-                                       JOIN pg_class cis
-                                               ON cis.oid=isub.indexrelid
-                                       JOIN pg_namespace ns
-                                               ON cis.relnamespace = ns.oid
-                                       WHERE cis.relname='$index' AND ns.nspname='$schema') AS s,
-                               pg_attribute,
-                               pg_opclass opcls,
-                               pg_am,
-                               pg_class ci
-                               JOIN pg_index i
-                                       ON ci.oid=i.indexrelid
-                               JOIN pg_class ct
-                                       ON ct.oid = i.indrelid
-                               JOIN pg_namespace n
-                                       ON ci.relnamespace = n.oid
-                               WHERE
-                                       ci.relname='$index' AND n.nspname='$schema'
-                                       AND     attrelid = ct.oid
-                                       AND     i.indkey[s.g] = attnum
-                                       AND     i.indclass[s.g] = opcls.oid
-                                       AND     pg_am.oid = opcls.opcmethod
+                       $schemas = $this->getCoreSchemas();
+               } else {
+                       $schemas = [ $schema ];
+               }
+
+               $eindex = $this->addQuotes( $index );
+
+               foreach ( $schemas as $schema ) {
+                       $eschema = $this->addQuotes( $schema );
+                       /*
+                        * A subquery would be not needed if we didn't care about the order
+                        * of attributes, but we do
+                        */
+                       $sql = <<<__INDEXATTR__
+
+                               SELECT opcname,
+                                       attname,
+                                       i.indoption[s.g] as option,
+                                       pg_am.amname
+                               FROM
+                                       (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
+                                               FROM
+                                                       pg_index isub
+                                               JOIN pg_class cis
+                                                       ON cis.oid=isub.indexrelid
+                                               JOIN pg_namespace ns
+                                                       ON cis.relnamespace = ns.oid
+                                               WHERE cis.relname=$eindex AND ns.nspname=$eschema) AS s,
+                                       pg_attribute,
+                                       pg_opclass opcls,
+                                       pg_am,
+                                       pg_class ci
+                                       JOIN pg_index i
+                                               ON ci.oid=i.indexrelid
+                                       JOIN pg_class ct
+                                               ON ct.oid = i.indrelid
+                                       JOIN pg_namespace n
+                                               ON ci.relnamespace = n.oid
+                                       WHERE
+                                               ci.relname=$eindex AND n.nspname=$eschema
+                                               AND     attrelid = ct.oid
+                                               AND     i.indkey[s.g] = attnum
+                                               AND     i.indclass[s.g] = opcls.oid
+                                               AND     pg_am.oid = opcls.opcmethod
 __INDEXATTR__;
-               $res = $this->query( $sql, __METHOD__ );
-               $a = [];
-               if ( $res ) {
-                       foreach ( $res as $row ) {
-                               $a[] = [
-                                       $row->attname,
-                                       $row->opcname,
-                                       $row->amname,
-                                       $row->option ];
+                       $res = $this->query( $sql, __METHOD__ );
+                       $a = [];
+                       if ( $res ) {
+                               foreach ( $res as $row ) {
+                                       $a[] = [
+                                               $row->attname,
+                                               $row->opcname,
+                                               $row->amname,
+                                               $row->option ];
+                               }
+                               return $a;
                        }
-               } else {
-                       return null;
                }
-
-               return $a;
+               return null;
        }
 
        public function indexUnique( $table, $index, $fname = __METHOD__ ) {
@@ -784,9 +794,9 @@ __INDEXATTR__;
        }
 
        public function listTables( $prefix = null, $fname = __METHOD__ ) {
-               $eschema = $this->addQuotes( $this->getCoreSchema() );
+               $eschemas = implode( ',', array_map( [ $this, 'addQuotes' ], $this->getCoreSchemas() ) );
                $result = $this->query(
-                       "SELECT tablename FROM pg_tables WHERE schemaname = $eschema", $fname );
+                       "SELECT DISTINCT tablename FROM pg_tables WHERE schemaname IN ($eschemas)", $fname );
                $endArray = [];
 
                foreach ( $result as $table ) {
@@ -977,6 +987,29 @@ __INDEXATTR__;
                return $this->coreSchema;
        }
 
+       /**
+        * Return schema names for temporary tables and core application tables
+        *
+        * @since 1.31
+        * @return string[] schema names
+        */
+       public function getCoreSchemas() {
+               if ( $this->tempSchema ) {
+                       return [ $this->tempSchema, $this->getCoreSchema() ];
+               }
+
+               $res = $this->query(
+                       "SELECT nspname FROM pg_catalog.pg_namespace n WHERE n.oid = pg_my_temp_schema()", __METHOD__
+               );
+               $row = $this->fetchObject( $res );
+               if ( $row ) {
+                       $this->tempSchema = $row->nspname;
+                       return [ $this->tempSchema, $this->getCoreSchema() ];
+               }
+
+               return [ $this->getCoreSchema() ];
+       }
+
        public function getServerVersion() {
                if ( !isset( $this->numericVersion ) ) {
                        $conn = $this->getBindingHandle();
@@ -1009,18 +1042,24 @@ __INDEXATTR__;
                        $types = [ $types ];
                }
                if ( $schema === false ) {
-                       $schema = $this->getCoreSchema();
+                       $schemas = $this->getCoreSchemas();
+               } else {
+                       $schemas = [ $schema ];
                }
                $table = $this->realTableName( $table, 'raw' );
                $etable = $this->addQuotes( $table );
-               $eschema = $this->addQuotes( $schema );
-               $sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
-                       . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
-                       . "AND c.relkind IN ('" . implode( "','", $types ) . "')";
-               $res = $this->query( $sql );
-               $count = $res ? $res->numRows() : 0;
+               foreach ( $schemas as $schema ) {
+                       $eschema = $this->addQuotes( $schema );
+                       $sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
+                               . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
+                               . "AND c.relkind IN ('" . implode( "','", $types ) . "')";
+                       $res = $this->query( $sql );
+                       if ( $res && $res->numRows() ) {
+                               return true;
+                       }
+               }
 
-               return (bool)$count;
+               return false;
        }
 
        /**
@@ -1045,20 +1084,21 @@ __INDEXATTR__;
                        AND tgrelid=pg_class.oid
                        AND nspname=%s AND relname=%s AND tgname=%s
 SQL;
-               $res = $this->query(
-                       sprintf(
-                               $q,
-                               $this->addQuotes( $this->getCoreSchema() ),
-                               $this->addQuotes( $table ),
-                               $this->addQuotes( $trigger )
-                       )
-               );
-               if ( !$res ) {
-                       return null;
+               foreach ( $this->getCoreSchemas() as $schema ) {
+                       $res = $this->query(
+                               sprintf(
+                                       $q,
+                                       $this->addQuotes( $schema ),
+                                       $this->addQuotes( $table ),
+                                       $this->addQuotes( $trigger )
+                               )
+                       );
+                       if ( $res && $res->numRows() ) {
+                               return true;
+                       }
                }
-               $rows = $res->numRows();
 
-               return $rows;
+               return false;
        }
 
        public function ruleExists( $table, $rule ) {
@@ -1066,7 +1106,7 @@ SQL;
                        [
                                'rulename' => $rule,
                                'tablename' => $table,
-                               'schemaname' => $this->getCoreSchema()
+                               'schemaname' => $this->getCoreSchemas()
                        ]
                );
 
@@ -1074,19 +1114,19 @@ SQL;
        }
 
        public function constraintExists( $table, $constraint ) {
-               $sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
-                       "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
-                       $this->addQuotes( $this->getCoreSchema() ),
-                       $this->addQuotes( $table ),
-                       $this->addQuotes( $constraint )
-               );
-               $res = $this->query( $sql );
-               if ( !$res ) {
-                       return null;
+               foreach ( $this->getCoreSchemas() as $schema ) {
+                       $sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
+                               "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
+                               $this->addQuotes( $schema ),
+                               $this->addQuotes( $table ),
+                               $this->addQuotes( $constraint )
+                       );
+                       $res = $this->query( $sql );
+                       if ( $res && $res->numRows() ) {
+                               return true;
+                       }
                }
-               $rows = $res->numRows();
-
-               return $rows;
+               return false;
        }
 
        /**
index 600f34a..53c3d33 100644 (file)
@@ -38,30 +38,34 @@ AND attname=%s;
 SQL;
 
                $table = $db->remappedTableName( $table );
-               $res = $db->query(
-                       sprintf( $q,
-                               $db->addQuotes( $db->getCoreSchema() ),
-                               $db->addQuotes( $table ),
-                               $db->addQuotes( $field )
-                       )
-               );
-               $row = $db->fetchObject( $res );
-               if ( !$row ) {
-                       return null;
+               foreach ( $db->getCoreSchemas() as $schema ) {
+                       $res = $db->query(
+                               sprintf( $q,
+                                       $db->addQuotes( $schema ),
+                                       $db->addQuotes( $table ),
+                                       $db->addQuotes( $field )
+                               )
+                       );
+                       $row = $db->fetchObject( $res );
+                       if ( !$row ) {
+                               continue;
+                       }
+                       $n = new PostgresField;
+                       $n->type = $row->typname;
+                       $n->nullable = ( $row->attnotnull == 'f' );
+                       $n->name = $field;
+                       $n->tablename = $table;
+                       $n->max_length = $row->attlen;
+                       $n->deferrable = ( $row->deferrable == 't' );
+                       $n->deferred = ( $row->deferred == 't' );
+                       $n->conname = $row->conname;
+                       $n->has_default = ( $row->atthasdef === 't' );
+                       $n->default = $row->adsrc;
+
+                       return $n;
                }
-               $n = new PostgresField;
-               $n->type = $row->typname;
-               $n->nullable = ( $row->attnotnull == 'f' );
-               $n->name = $field;
-               $n->tablename = $table;
-               $n->max_length = $row->attlen;
-               $n->deferrable = ( $row->deferrable == 't' );
-               $n->deferred = ( $row->deferred == 't' );
-               $n->conname = $row->conname;
-               $n->has_default = ( $row->atthasdef === 't' );
-               $n->default = $row->adsrc;
 
-               return $n;
+               return null;
        }
 
        function name() {