summaryrefslogtreecommitdiff
path: root/vendor/adodb/adodb-php/replicate/adodb-replicate.inc.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/adodb/adodb-php/replicate/adodb-replicate.inc.php')
-rw-r--r--vendor/adodb/adodb-php/replicate/adodb-replicate.inc.php1180
1 files changed, 1180 insertions, 0 deletions
diff --git a/vendor/adodb/adodb-php/replicate/adodb-replicate.inc.php b/vendor/adodb/adodb-php/replicate/adodb-replicate.inc.php
new file mode 100644
index 0000000..f13e4af
--- /dev/null
+++ b/vendor/adodb/adodb-php/replicate/adodb-replicate.inc.php
@@ -0,0 +1,1180 @@
+<?php
+
+define('ADODB_REPLICATE',1.2);
+
+include_once(ADODB_DIR.'/adodb-datadict.inc.php');
+
+/*
+1.2 9 June 2009
+Minor patches
+
+1.1 8 June 2009
+Added $lastUpdateFld to replicatedata
+Added $rep->compat. If compat set to 1.0, then $lastUpdateFld not used during MergeData.
+
+1.0 Apr 2009
+Added support for MFFA
+
+0.9 ? 2008
+First release
+
+
+ Note: this code assumes that comments such as / * * / ar`e allowed which works with:
+ Note: this code assumes that comments such as / * * / are allowed which works with:
+ mssql, postgresql, oracle, mssql
+
+ Replication engine to
+ - copy table structures and data from different databases (e.g. mysql to oracle)
+ for replication purposes
+ - generate CREATE TABLE, CREATE INDEX, INSERT ... for installation scripts
+
+ Table Structure copying includes
+ - fields and limited subset of types
+ - optional default values
+ - indexes
+ - but not constraints
+
+
+ Two modes of data copy:
+
+ ReplicateData
+ - Copy from src to dest, with update of status of copy back to src,
+ with configurable src SELECT where clause
+
+ MergeData
+ - Copy from src to dest based on last mod date field and/or copied flag field
+
+ Default settings are
+ - do not execute, generate sql ($rep->execute = false)
+ - do not delete records in dest table first ($rep->deleteFirst = false).
+ if $rep->deleteFirst is true and primary keys are defined,
+ then no deletion will occur unless *INSERTONLY* is defined in pkey array
+ - only commit once at the end of every ReplicateData ($rep->commitReplicate = true)
+ - do not autocommit every x records processed ($rep->commitRecs = -1)
+ - even if error occurs on one record, continue copying remaining records ($rep->neverAbort = true)
+ - debugging turned off ($rep->debug = false)
+*/
+
+class ADODB_Replicate {
+ var $connSrc;
+ var $connDest;
+
+ var $connSrc2 = false;
+ var $connDest2 = false;
+ var $ddSrc;
+ var $ddDest;
+
+ var $execute = false;
+ var $debug = false;
+ var $deleteFirst = false;
+ var $commitReplicate = true; // commit at end of replicatedata
+ var $commitRecs = -1; // only commit at end of ReplicateData()
+
+ var $selFilter = false;
+ var $fieldFilter = false;
+ var $indexFilter = false;
+ var $updateFilter = false;
+ var $insertFilter = false;
+ var $updateSrcFn = false;
+
+ var $limitRecs = false;
+
+ var $neverAbort = true;
+ var $copyTableDefaults = false; // turn off because functions defined as defaults will not work when copied
+ var $errHandler = false; // name of error handler function, if used.
+ var $htmlSpecialChars = true; // if execute false, then output with htmlspecialchars enabled.
+ // Will autoconfigure itself. No need to modify
+
+ var $trgSuffix = '_mrgTr';
+ var $idxSuffix = '_mrgidx';
+ var $trLogic = '1 = 1';
+ var $datesAreTimeStamps = false;
+
+ var $oracleSequence = false;
+ var $readUncommitted = false; // read without obeying shared locks for fast select (mssql)
+
+ var $compat = false;
+ // connSrc2 and connDest2 are only required if the db driver
+ // does not allow updates back to src db in first connection (the select connection),
+ // so we need 2nd connection
+ function __construct($connSrc, $connDest, $connSrc2=false, $connDest2=false)
+ {
+
+ if (strpos($connSrc->databaseType,'odbtp') !== false) {
+ $connSrc->_bindInputArray = false; # bug in odbtp, binding fails
+ }
+
+ if (strpos($connDest->databaseType,'odbtp') !== false) {
+ $connDest->_bindInputArray = false; # bug in odbtp, binding fails
+ }
+
+ $this->connSrc = $connSrc;
+ $this->connDest = $connDest;
+
+ $this->connSrc2 = ($connSrc2) ? $connSrc2 : $connSrc;
+ $this->connDest2 = ($connDest2) ? $connDest2 : $connDest;
+
+ $this->ddSrc = NewDataDictionary($connSrc);
+ $this->ddDest = NewDataDictionary($connDest);
+ $this->htmlSpecialChars = isset($_SERVER['HTTP_HOST']);
+ }
+
+ function ExecSQL($sql)
+ {
+ if (!is_array($sql)) $sql[] = $sql;
+
+ $ret = true;
+ foreach($sql as $s)
+ if (!$this->execute) echo "<pre>",$s.";\n</pre>";
+ else {
+ $ok = $this->connDest->Execute($s);
+ if (!$ok)
+ if ($this->neverAbort) $ret = false;
+ else return false;
+ }
+
+ return $ret;
+ }
+
+ /*
+ We assume replication between $table and $desttable only works if the field names and types match for both tables.
+
+ Also $table and desttable can have different names.
+ */
+
+ function CopyTableStruct($table,$desttable='')
+ {
+ $sql = $this->CopyTableStructSQL($table,$desttable);
+ if (empty($sql)) return false;
+ return $this->ExecSQL($sql);
+ }
+
+ function RunFieldFilter(&$fld, $mode = '')
+ {
+ if ($this->fieldFilter) {
+ $fn = $this->fieldFilter;
+ return $fn($fld, $mode);
+ } else
+ return $fld;
+ }
+
+ function RunUpdateFilter($table, $fld, $val)
+ {
+ if ($this->updateFilter) {
+ $fn = $this->updateFilter;
+ return $fn($table, $fld, $val);
+ } else
+ return $val;
+ }
+
+ function RunInsertFilter($table, $fld, &$val)
+ {
+ if ($this->insertFilter) {
+ $fn = $this->insertFilter;
+ return $fn($table, $fld, $val);
+ } else
+ return $fld;
+ }
+
+ /*
+ $mode = INS or UPD
+
+ The lastUpdateFld holds the field that counts the number of updates or the date of last mod. This ensures that
+ if the rec was modified after replicatedata retrieves the data but before we update back the src record,
+ we don't set the copiedflag='Y' yet.
+ */
+ function RunUpdateSrcFn($srcdb, $table, $fldoffsets, $row, $where, $mode, $dest_insertid=null, $lastUpdateFld='')
+ {
+ if (!$this->updateSrcFn) return;
+
+ $bindarr = array();
+ foreach($fldoffsets as $k) {
+ $bindarr[$k] = $row[$k];
+ }
+ $last = sizeof($row);
+
+ if ($lastUpdateFld && $row[$last-1]) {
+ $ds = $row[$last-1];
+ if (strpos($ds,':') !== false) $s = $srcdb->DBTimeStamp($ds);
+ else $s = $srcdb->qstr($ds);
+ $where = "WHERE $lastUpdateFld = $s and $where";
+ } else
+ $where = "WHERE $where";
+ $fn = $this->updateSrcFn;
+ if (is_array($fn)) {
+ if (sizeof($fn) == 1) $set = reset($fn);
+ else $set = @$fn[$mode];
+ if ($set) {
+
+ if (strlen($dest_insertid) == 0) $dest_insertid = 'null';
+ $set = str_replace('$INSERT_ID',$dest_insertid,$set);
+
+ $sql = "UPDATE $table SET $set $where ";
+ $ok = $srcdb->Execute($sql,$bindarr);
+ if (!$ok) {
+ echo $srcdb->ErrorMsg(),"<br>\n";
+ die();
+ }
+ }
+ } else $fn($srcdb, $table, $row, $where, $bindarr, $mode, $dest_insertid);
+
+ }
+
+ function CopyTableStructSQL($table, $desttable='',$dropdest =false)
+ {
+ if (!$desttable) {
+ $desttable = $table;
+ $prefixidx = '';
+ } else
+ $prefixidx = $desttable;
+
+ $conn = $this->connSrc;
+ $types = $conn->MetaColumns($table);
+ if (!$types) {
+ echo "$table does not exist in source db<br>\n";
+ return array();
+ }
+ if (!$dropdest && $this->connDest->MetaColumns($desttable)) {
+ echo "$desttable already exists in dest db<br>\n";
+ return array();
+ }
+ if ($this->debug) var_dump($types);
+ $sa = array();
+ $idxcols = array();
+
+ foreach($types as $name => $t) {
+ $s = '';
+ $mt = $this->ddSrc->MetaType($t->type);
+ $len = $t->max_length;
+ $fldname = $this->RunFieldFilter($t->name,'TABLE');
+ if (!$fldname) continue;
+
+ $s .= $fldname . ' '.$mt;
+ if (isset($t->scale)) $precision = '.'.$t->scale;
+ else $precision = '';
+ if ($mt == 'C' or $mt == 'X') $s .= "($len)";
+ else if ($mt == 'N' && $precision) $s .= "($len$precision)";
+
+ if ($mt == 'R') $idxcols[] = $fldname;
+
+ if ($this->copyTableDefaults) {
+ if (isset($t->default_value)) {
+ $v = $t->default_value;
+ if ($mt == 'C' or $mt == 'X') $v = $this->connDest->qstr($v); // might not work as this could be function
+ $s .= ' DEFAULT '.$v;
+ }
+ }
+
+ $sa[] = $s;
+ }
+
+ $s = implode(",\n",$sa);
+
+ // dump adodb intermediate data dictionary format
+ if ($this->debug) echo '<pre>'.$s.'</pre>';
+
+ $sqla = $this->ddDest->CreateTableSQL($desttable,$s);
+
+ /*
+ if ($idxcols) {
+ $idxoptions = array('UNIQUE'=>1);
+ $sqla2 = $this->ddDest->_IndexSQL($table.'_'.$fldname.'_SERIAL', $desttable, $idxcols,$idxoptions);
+ $sqla = array_merge($sqla,$sqla2);
+ }*/
+
+ $idxs = $conn->MetaIndexes($table);
+ if ($idxs)
+ foreach($idxs as $name => $iarr) {
+ $idxoptions = array();
+ $fldnames = array();
+
+ if(!empty($iarr['unique'])) {
+ $idxoptions['UNIQUE'] = 1;
+ }
+
+ foreach($iarr['columns'] as $fld) {
+ $fldnames[] = $this->RunFieldFilter($fld,'TABLE');
+ }
+
+ $idxname = $prefixidx.str_replace($table,$desttable,$name);
+
+ if (!empty($this->indexFilter)) {
+ $fn = $this->indexFilter;
+ $idxname = $fn($desttable,$idxname,$fldnames,$idxoptions);
+ }
+ $sqla2 = $this->ddDest->_IndexSQL($idxname, $desttable, $fldnames,$idxoptions);
+ $sqla = array_merge($sqla,$sqla2);
+ }
+
+ return $sqla;
+ }
+
+ function _clearcache()
+ {
+
+ }
+
+ function _concat($v)
+ {
+ return $this->connDest->concat("' ","chr(".ord($v).")","'");
+ }
+
+ function fixupbinary($v)
+ {
+ return str_replace(
+ array("\r","\n"),
+ array($this->_concat("\r"),$this->_concat("\n")),
+ $v );
+ }
+
+ function SwapDBs()
+ {
+ $o = $this->connSrc;
+ $this->connSrc = $this->connDest;
+ $this->connDest = $o;
+
+
+ $o = $this->connSrc2;
+ $this->connSrc2 = $this->connDest2;
+ $this->connDest2 = $o;
+
+ $o = $this->ddSrc;
+ $this->ddSrc = $this->ddDest;
+ $this->ddDest = $o;
+ }
+
+ /*
+ // if no uniqflds defined, then all desttable recs will be deleted before insert
+ // $where clause must include the WHERE word if used
+ // if $this->commitRecs is set to a +ve value, then it will autocommit every $this->commitRecs records
+ // -- this should never be done with 7x24 db's
+
+ Returns an array:
+ $arr[0] = true if no error, false if error
+ $arr[1] = number of recs processed
+ $arr[2] = number of successful inserts
+ $arr[3] = number of successful updates
+
+ ReplicateData() params:
+
+ $table = src table name
+ $desttable = dest table name, leave blank to use src table name
+ $uniqflds = array() = an array. If set, then inserts and updates will occur. eg. array('PK1', 'PK2');
+ To prevent updates to desttable (allow only to src table), add '*INSERTONLY*' or '*ONLYINSERT*' to array.
+
+ Sometimes you are replicating a src table with an autoinc primary key.
+ You sometimes create recs in the dest table. The dest table has to retrieve the
+ src table's autoinc key (stored in a 2nd field) so you can match the two tables.
+
+ To define this, and the uniqflds contains nested arrays. Copying from autoinc table to other table:
+ array(array($destpkey), array($destfld_holds_src_autoinc_pkey))
+
+ Copying from normal table to autoinc table:
+ array(array($destpkey), array(), array($srcfld_holds_dest_autoinc_pkey))
+
+ $where = where clause for SELECT from $table $where. Include the WHERE reserved word in beginning.
+ You can put ORDER BY at the end also
+ $ignoreflds = array(), list of fields to ignore. e.g. array('FLD1',FLD2');
+ $dstCopyDateFld = date field on $desttable to update with current date
+ $extraflds allows you to add additional flds to insert/update. Format
+ array(fldname => $fldval)
+ $fldval itself can be an array or a string. If an array, then
+ $extraflds = array($fldname => array($insertval, $updateval))
+
+ Thus we have the following behaviours:
+
+ a. Delete all data in $desttable then insert from src $table
+
+ $rep->execute = true;
+ $rep->ReplicateData($table, $desttable)
+
+ b. Update $desttable if record exists (based on $uniqflds), otherwise insert.
+
+ $rep->execute = true;
+ $rep->ReplicateData($table, $desttable, $array($pkey1, $pkey2))
+
+ c. Select from src $table all data modified since a date. Then update $desttable
+ if record exists (based on $uniqflds), otherwise insert
+
+ $rep->execute = true;
+ $rep->ReplicateData($table, $desttable, array($pkey1, $pkey2), "WHERE update_datetime_fld > $LAST_REFRESH")
+
+ d. Insert all records into $desttable modified after a certain id (or time) in src $table:
+
+ $rep->execute = true;
+ $rep->ReplicateData($table, $desttable, false, "WHERE id_fld > $LAST_ID_SAVED", true);
+
+
+ For (a) to (d), returns array: array($boolean_ok_fail, $no_recs_selected_from_src_db, $no_recs_inserted, $no_recs_updated);
+
+ e. Generate sample SQL:
+
+ $rep->execute = false;
+ $rep->ReplicateData(....);
+
+ This returns $array, which contains:
+
+ $array['SEL'] = select stmt from src db
+ $array['UPD'] = update stmt to dest db
+ $array['INS'] = insert stmt to dest db
+
+
+ Error-handling
+ ==============
+ Default is never abort if error occurs. You can set $rep->neverAbort = false; to force replication to abort if an error occurs.
+
+
+ Value Filtering
+ ========
+ Sometimes you might need to modify/massage the data before the code works. Assume that the value used for True and False is
+ 'T' and 'F' in src DB, but is 'Y' and 'N' in dest DB for field[2] in select stmt. You can do this by
+
+ $rep->filterSelect = 'filter';
+ $rep->ReplicateData(...);
+
+ function filter($table,& $fields, $deleteFirst)
+ {
+ if ($table == 'SOMETABLE') {
+ if ($fields[2] == 'T') $fields[2] = 'Y';
+ else if ($fields[2] == 'F') $fields[2] = 'N';
+ }
+ }
+
+ We pass in $deleteFirst as that determines the order of the fields (which are numeric-based):
+ TRUE: the order of fields matches the src table order
+ FALSE: the order of fields is all non-primary key fields first, followed by primary key fields. This is because it needs
+ to match the UPDATE statement, which is UPDATE $table SET f2 = ?, f3 = ? ... WHERE f1 = ?
+
+ Name Filtering
+ =========
+ Sometimes field names that are legal in one RDBMS can be illegal in another.
+ We allow you to handle this using a field filter.
+ Also if you don't want to replicate certain fields, just return false.
+
+ $rep->fieldFilter = 'ffilter';
+
+ function ffilter(&$fld,$mode)
+ {
+ $uf = strtoupper($fld);
+ switch($uf) {
+ case 'GROUP':
+ if ($mode == 'SELECT') $fld = '"Group"';
+ return 'GroupFld';
+
+ case 'PRIVATEFLD': # do not replicate
+ return false;
+ }
+ return $fld;
+ }
+
+
+ UPDATE FILTERING
+ ================
+ Sometimes, when we want to update
+ UPDATE table SET fld = val WHERE ....
+
+ we want to modify val. To do so, define
+
+ $rep->updateFilter = 'ufilter';
+
+ function ufilter($table, $fld, $val)
+ {
+ return "nvl($fld, $val)";
+ }
+
+
+ Sending back audit info back to src Table
+ =========================================
+
+ Use $rep->updateSrcFn. This can be an array of strings, or the name of a php function to call.
+
+ If an array of strings is defined, then it will perform an update statement...
+
+ UPDATE srctable SET $string WHERE ....
+
+ With $string set to the array you define. If a new record was inserted into desttable, then the
+ 'INS' string is used ($INSERT_ID will be replaced with the real INSERT_ID, if any),
+ and if an update then use the 'UPD' string.
+
+ array(
+ 'INS' => 'insertid = $INSERT_ID, copieddate=getdate(), copied = 1',
+ 'UPD' => 'copieddate=getdate(), copied = 1'
+ )
+
+ If a single string array is defined, then it will be used for both insert and update.
+ array('copieddate=getdate(), copied = 1')
+
+ Note that the where clause is automatically defined by the system.
+
+ If $rep->updateSrcFn is a PHP function name, then it will be called with the following params:
+
+ $fn($srcConnection, $tableName, $row, $where, $bindarr, $mode, $dest_insertid)
+
+ $srcConnection - source db connection
+ $tableName - source tablename
+ $row - array holding records updated into dest
+ $where - where clause to be used (uses bind vars)
+ $bindarr - array holding bind variables for where clause
+ $mode - INS or UPD
+ $dest_insertid - when mode=INS, then the insert_id is stored here.
+
+
+ oracle mssql
+ ---> insert
+ mssqlid <--- insert_id
+ ----> update with mssqlid
+ <---- update with mssqlid
+
+
+ TODO: add src pkey and dest pkey for updates. Also sql stmt needs to be tuned, so dest pkey, src pkey
+ */
+
+
+ function ReplicateData($table, $desttable = '', $uniqflds = array(), $where = '',$ignore_flds = array(),
+ $dstCopyDateFld='', $extraflds = array(), $lastUpdateFld = '')
+ {
+ if (is_array($where)) {
+ $wheresrc = $where[0];
+ $wheredest = $where[1];
+ } else {
+ $wheresrc = $wheredest = $where;
+ }
+ $dstCopyDateName = $dstCopyDateFld;
+ $dstCopyDateFld = strtoupper($dstCopyDateFld);
+
+ $this->_clearcache();
+ if (is_string($uniqflds) && strlen($uniqflds)) $uniqflds = array($uniqflds);
+ if (!$desttable) $desttable = $table;
+
+ $uniq = array();
+ if ($uniqflds) {
+ if (is_array(reset($uniqflds))) {
+ /*
+ primary key of src and dest tables differ. This means when we perform the select stmts
+ we retrieve both keys. Then any insert statement will have to ignore one array element.
+ Any update statement will need to use a different where clause
+ */
+ $destuniqflds = $uniqflds[0];
+ if (sizeof($uniqflds)>1 && $uniqflds[1]) // srckey field name in dest table
+ $srcuniqflds = $uniqflds[1];
+ else
+ $srcuniqflds = array();
+
+ if (sizeof($uniqflds)>2)
+ $srcPKDest = reset($uniqflds[2]);
+
+ } else {
+ $destuniqflds = $uniqflds;
+ $srcuniqflds = array();
+ }
+ $onlyInsert = false;
+ foreach($destuniqflds as $k => $u) {
+ if ($u == '*INSERTONLY*' || $u == '*ONLYINSERT*') {
+ $onlyInsert = true;
+ continue;
+ }
+ $uniq[strtoupper($u)] = $k;
+ }
+ $deleteFirst = $this->deleteFirst;
+ } else {
+ $deleteFirst = true;
+ }
+
+ if ($deleteFirst) $onlyInsert = true;
+
+ if ($ignore_flds) {
+ foreach($ignore_flds as $u) {
+ $ignoreflds[strtoupper($u)] = 1;
+ }
+ } else
+ $ignoreflds = array();
+
+ $src = $this->connSrc;
+ $dest = $this->connDest;
+ $src2 = $this->connSrc2;
+
+ $dest->noNullStrings = false;
+ $src->noNullStrings = false;
+ $src2->noNullStrings = false;
+
+ if ($src === $dest) $this->execute = false;
+
+ $types = $src->MetaColumns($table);
+ if (!$types) {
+ echo "Source $table does not exist<br>\n";
+ return array();
+ }
+ $dtypes = $dest->MetaColumns($desttable);
+ if (!$dtypes) {
+ echo "Destination $desttable does not exist<br>\n";
+ return array();
+ }
+ $sa = array();
+ $selflds = array();
+ $wheref = array();
+ $wheres = array();
+ $srcwheref = array();
+ $fldoffsets = array();
+ $k = 0;
+ foreach($types as $name => $t) {
+ $name2 = strtoupper($this->RunFieldFilter($name,'SELECT'));
+ // handle quotes
+ if ($name2 && $name2[0] == '"' && $name2[strlen($name2)-1] == '"') $name22 = substr($name2,1,strlen($name2)-2);
+ elseif ($name2 && $name2[0] == '`' && $name2[strlen($name2)-1] == '`') $name22 = substr($name2,1,strlen($name2)-2);
+ else $name22 = $name2;
+
+ //else $name22 = $name2; // this causes problem for quotes strip above
+
+ if (!isset($dtypes[($name22)]) || !$name2) {
+ if ($this->debug) echo " Skipping $name ==> $name2 as not in destination $desttable<br>";
+ continue;
+ }
+
+ if ($name2 == $dstCopyDateFld) {
+ $dstCopyDateName = $t->name;
+ continue;
+ }
+
+ $fld = $t->name;
+ $fldval = $t->name;
+ $mt = $src->MetaType($t->type);
+ if ($this->datesAreTimeStamps && $mt == 'D') $mt = 'T';
+ if ($mt == 'D') $fldval = $dest->DBDate($fldval);
+ elseif ($mt == 'T') $fldval = $dest->DBTimeStamp($fldval);
+ $ufld = strtoupper($fld);
+
+ if (isset($ignoreflds[($name2)]) && !isset($uniq[$ufld])) {
+ continue;
+ }
+
+ if ($this->debug) echo " field=$fld type=$mt fldval=$fldval<br>";
+
+ if (!isset($uniq[$ufld])) {
+
+ $selfld = $fld;
+ $fld = $this->RunFieldFilter($selfld,'SELECT');
+ $selflds[] = $selfld;
+
+ $p = $dest->Param($k);
+
+ if ($mt == 'D') $p = $dest->DBDate($p, true);
+ else if ($mt == 'T') $p = $dest->DBTimeStamp($p, true);
+
+ # UPDATES
+ $sets[] = "$fld = ".$this->RunUpdateFilter($desttable, $fld, $p);
+
+ # INSERTS
+ $insflds[] = $this->RunInsertFilter($desttable,$fld, $p); $params[] = $p;
+ $k++;
+ } else {
+ $fld = $this->RunFieldFilter($fld);
+ $wheref[] = $fld;
+ if (!empty($srcuniqflds)) $srcwheref[] = $srcuniqflds[$uniq[$ufld]];
+ if ($mt == 'C') { # normally we don't include the primary key in the insert if it is numeric, but ok if varchar
+ $insertpkey = true;
+ }
+ }
+ }
+
+
+ foreach($extraflds as $fld => $evals) {
+ if (!is_array($evals)) $evals = array($evals, $evals);
+ $insflds[] = $this->RunInsertFilter($desttable,$fld, $p); $params[] = $evals[0];
+ $sets[] = "$fld = ".$evals[1];
+ }
+
+ if ($dstCopyDateFld) {
+ $sets[] = "$dstCopyDateName = ".$dest->sysTimeStamp;
+ $insflds[] = $this->RunInsertFilter($desttable,$dstCopyDateName, $p); $params[] = $dest->sysTimeStamp;
+ }
+
+
+ if (!empty($srcPKDest)) {
+ $selflds[] = $srcPKDest;
+ $fldoffsets = array($k+1);
+ }
+
+ foreach($wheref as $uu => $fld) {
+
+ $p = $dest->Param($k);
+ $sp = $src->Param($k);
+ if (!empty($srcuniqflds)) {
+ if ($uu > 1) die("Only one primary key for srcuniqflds allowed currently");
+ $destsrckey = reset($srcuniqflds);
+ $wheres[] = reset($srcuniqflds).' = '.$p;
+
+ $insflds[] = $this->RunInsertFilter($desttable,$destsrckey, $p);
+ $params[] = $p;
+ } else {
+ $wheres[] = $fld.' = '.$p;
+ if (!isset($ignoreflds[strtoupper($fld)]) || !empty($insertpkey)) {
+ $insflds[] = $this->RunInsertFilter($desttable,$fld, $p);
+ $params[] = $p;
+ }
+ }
+
+ $selflds[] = $fld;
+ $srcwheres[] = $fld.' = '.$sp;
+ $fldoffsets[] = $k;
+
+ $k++;
+ }
+
+ if (!empty($srcPKDest)) {
+ $fldoffsets = array($k);
+ $srcwheres = array($fld.'='.$src->Param($k));
+ $k++;
+ }
+
+ if ($lastUpdateFld) {
+ $selflds[] = $lastUpdateFld;
+ } else
+ $selflds[] = 'null as Z55_DUMMY_LA5TUPD';
+
+ $insfldss = implode(', ', $insflds);
+ $fldss = implode(', ', $selflds);
+ $setss = implode(', ', $sets);
+ $paramss = implode(', ', $params);
+ $wheress = implode(' AND ', $wheres);
+ if (isset($srcwheres))
+ $srcwheress = implode(' AND ',$srcwheres);
+
+
+ $seltable = $table;
+ if ($this->readUncommitted && strpos($src->databaseType,'mssql')) $seltable .= ' with (NOLOCK)';
+
+ $sa['SEL'] = "SELECT $fldss FROM $seltable $wheresrc";
+ $sa['INS'] = "INSERT INTO $desttable ($insfldss) VALUES ($paramss) /**INS**/";
+ $sa['UPD'] = "UPDATE $desttable SET $setss WHERE $wheress /**UPD**/";
+
+
+
+ $DB1 = "/* <font color=green> Source DB - sample sql in case you need to adapt code\n\n";
+ $DB2 = "/* <font color=green> Dest DB - sample sql in case you need to adapt code\n\n";
+
+ if (!$this->execute) echo '/*<style>
+pre {
+white-space: pre-wrap; /* css-3 */
+white-space: -moz-pre-wrap !important; /* Mozilla, since 1999 */
+white-space: -pre-wrap; /* Opera 4-6 */
+white-space: -o-pre-wrap; /* Opera 7 */
+word-wrap: break-word; /* Internet Explorer 5.5+ */
+}
+</style><pre>*/
+';
+ if ($deleteFirst && $this->deleteFirst) {
+ $where = preg_replace('/[ \n\r\t]+order[ \n\r\t]+by.*$/i', '', $where);
+ $sql = "DELETE FROM $desttable $wheredest\n";
+ if (!$this->execute) echo $DB2,'</font>*/',$sql,"\n";
+ else $dest->Execute($sql);
+ }
+
+ global $ADODB_COUNTRECS;
+ $err = false;
+ $savemode = $src->setFetchMode(ADODB_FETCH_NUM);
+ $ADODB_COUNTRECS = false;
+
+ if (!$this->execute) {
+ echo $DB1,$sa['SEL'],"</font>\n*/\n\n";
+ echo $DB2,$sa['INS'],"</font>\n*/\n\n";
+ $suffix = ($onlyInsert) ? ' PRIMKEY=?' : '';
+ echo $DB2,$sa['UPD'],"$suffix</font>\n*/\n\n";
+
+ $rs = $src->Execute($sa['SEL']);
+ $cnt = 1;
+ $upd = 0;
+ $ins = 0;
+
+ $sqlarr = explode('?',$sa['INS']);
+ $nparams = sizeof($sqlarr)-1;
+
+ $useQmark = $dest && ($dest->dataProvider != 'oci8');
+
+ while ($rs && !$rs->EOF) {
+ if ($useQmark) {
+ $sql = ''; $i = 0;
+ $arr = array_reverse($rs->fields);
+ foreach ($arr as $v) {
+ $sql .= $sqlarr[$i];
+ // from Ron Baldwin <ron.baldwin#sourceprose.com>
+ // Only quote string types
+ $typ = gettype($v);
+ if ($typ == 'string')
+ //New memory copy of input created here -mikefedyk
+ $sql .= $dest->qstr($v);
+ else if ($typ == 'double')
+ $sql .= str_replace(',','.',$v); // locales fix so 1.1 does not get converted to 1,1
+ else if ($typ == 'boolean')
+ $sql .= $v ? $dest->true : $dest->false;
+ else if ($typ == 'object') {
+ if (method_exists($v, '__toString')) $sql .= $dest->qstr($v->__toString());
+ else $sql .= $dest->qstr((string) $v);
+ } else if ($v === null)
+ $sql .= 'NULL';
+ else
+ $sql .= $v;
+ $i += 1;
+
+ if ($i == $nparams) break;
+ } // while
+
+ if (isset($sqlarr[$i])) {
+ $sql .= $sqlarr[$i];
+ }
+ $INS = $sql;
+ } else {
+ $INS = $sa['INS'];
+ $arr = array_reverse($rs->fields);
+ foreach($arr as $k => $v) { // only works on oracle currently
+ $k = sizeof($arr)-$k-1;
+ $v = str_replace(":","%~%COLON%!%",$v);
+ $INS = str_replace(':'.$k,$this->fixupbinary($dest->qstr($v)),$INS);
+ }
+ $INS = str_replace("%~%COLON%!%",":",$INS);
+ if ($this->htmlSpecialChars) $INS = htmlspecialchars($INS);
+ }
+ echo "-- $cnt\n",$INS,";\n\n";
+ $cnt += 1;
+ $ins += 1;
+ $rs->MoveNext();
+ }
+ $src->setFetchMode($savemode);
+ return $sa;
+ } else {
+ $saved = $src->debug;
+ #$src->debug=1;
+ if ($this->limitRecs>100)
+ $rs = $src->SelectLimit($sa['SEL'],$this->limitRecs);
+ else
+ $rs = $src->Execute($sa['SEL']);
+ $src->debug = $saved;
+ if (!$rs) {
+ if ($this->errHandler) $this->_doerr('SEL',array());
+ return array(0,0,0,0);
+ }
+
+
+ if ($this->commitReplicate || $commitRecs > 0) {
+ $dest->BeginTrans();
+ if ($this->updateSrcFn) $src2->BeginTrans();
+ }
+
+ if ($this->updateSrcFn && strpos($src2->databaseType,'mssql') !== false) {
+ # problem is writers interfere with readers in mssql
+ $rs = $src->_rs2rs($rs);
+ }
+ $cnt = 0;
+ $upd = 0;
+ $ins = 0;
+
+ $sizeofrow = sizeof($selflds);
+
+ $fn = $this->selFilter;
+ $commitRecs = $this->commitRecs;
+
+ $saved = $dest->debug;
+
+ if ($this->deleteFirst) $onlyInsert = true;
+ while ($origrow = $rs->FetchRow()) {
+
+ if ($dest->debug) {flush(); @ob_flush();}
+
+ if ($fn) {
+ if (!$fn($desttable, $origrow, $deleteFirst, $this, $selflds)) continue;
+ }
+ $doinsert = true;
+ $row = array_slice($origrow,0,$sizeofrow-1);
+
+ if (!$onlyInsert) {
+ $doinsert = false;
+ $upderr = false;
+
+ if (isset($srcPKDest)) {
+ if (is_null($origrow[$sizeofrow-3])) {
+ $doinsert = true;
+ $upderr = true;
+ }
+ }
+ if (!$upderr && !$dest->Execute($sa['UPD'],$row)) {
+ $err = true;
+ $upderr = true;
+ if ($this->errHandler) $this->_doerr('UPD',$row);
+ if (!$this->neverAbort) break;
+ }
+
+ if ($upderr || $dest->Affected_Rows() == 0) {
+ $doinsert = true;
+ } else {
+ if (!empty($uniqflds)) $this->RunUpdateSrcFn($src2, $table, $fldoffsets, $origrow, $srcwheress, 'UPD', null, $lastUpdateFld);
+ $upd += 1;
+ }
+ }
+
+ if ($doinsert) {
+ $inserr = false;
+ if (isset($srcPKDest)) {
+ $row = array_slice($origrow,0,$sizeofrow-2);
+ }
+
+ if (! $dest->Execute($sa['INS'],$row)) {
+ $err = true;
+ $inserr = true;
+ if ($this->errHandler) $this->_doerr('INS',$row);
+ if ($this->neverAbort) continue;
+ else break;
+ } else {
+ if ($dest->dataProvider == 'oci8') {
+ if ($this->oracleSequence) $lastid = $dest->GetOne("select ".$this->oracleSequence.".currVal from dual");
+ else $lastid = 'null';
+ } else {
+ $lastid = $dest->Insert_ID();
+ }
+
+ if (!$inserr && !empty($uniqflds)) {
+ $this->RunUpdateSrcFn($src2, $table, $fldoffsets, $origrow, $srcwheress, 'INS', $lastid,$lastUpdateFld);
+ }
+ $ins += 1;
+ }
+ }
+ $cnt += 1;
+
+ if ($commitRecs > 0 && ($cnt % $commitRecs) == 0) {
+ $dest->CommitTrans();
+ $dest->BeginTrans();
+
+ if ($this->updateSrcFn) {
+ $src2->CommitTrans();
+ $src2->BeginTrans();
+ }
+ }
+
+ } // while
+
+
+ if ($this->commitReplicate || $commitRecs > 0) {
+ if (!$this->neverAbort && $err) {
+ $dest->RollbackTrans();
+ if ($this->updateSrcFn) $src2->RollbackTrans();
+ } else {
+ $dest->CommitTrans();
+ if ($this->updateSrcFn) $src2->CommitTrans();
+ }
+ }
+ }
+ if ($cnt != $ins + $upd) echo "<p>ERROR: $cnt != INS $ins + UPD $upd</p>";
+ $src->setFetchMode($savemode);
+ return array(!$err, $cnt, $ins, $upd);
+ }
+ // trigger support only for sql server and oracle
+ // need to add
+ function MergeSrcSetup($srcTable, $pkeys, $srcUpdateDateFld, $srcCopyDateFld, $srcCopyFlagFld,
+ $srcCopyFlagType='C(1)', $srcCopyFlagVals = array('Y','N','P','='))
+ {
+ $sqla = array();
+ $src = $this->connSrc;
+ $idx = $srcTable.'_mrgIdx';
+ $cols = $src->MetaColumns($srcTable);
+ #adodb_pr($cols);
+ if (!isset($cols[strtoupper($srcUpdateDateFld)])) {
+ $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcUpdateDateFld TS DEFTIMESTAMP");
+ foreach($sqla as $sql) $src->Execute($sql);
+ }
+
+ if ($srcCopyDateFld && !isset($cols[strtoupper($srcCopyDateFld)])) {
+ $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcCopyDateFld TS DEFTIMESTAMP");
+ foreach($sqla as $sql) $src->Execute($sql);
+ }
+
+ $sysdate = $src->sysTimeStamp;
+ $arrv0 = $src->qstr($srcCopyFlagVals[0]);
+ $arrv1 = $src->qstr($srcCopyFlagVals[1]);
+ $arrv2 = $src->qstr($srcCopyFlagVals[2]);
+ $arrv3 = $src->qstr($srcCopyFlagVals[3]);
+
+ if ($srcCopyFlagFld && !isset($cols[strtoupper($srcCopyFlagFld)])) {
+ $sqla = $this->ddSrc->AddColumnSQL($srcTable, "$srcCopyFlagFld $srcCopyFlagType DEFAULT $arrv1");
+ foreach($sqla as $sql) $src->Execute($sql);
+ }
+
+ $sqla = array();
+
+
+ $name = "{$srcTable}_mrgTr";
+ if (is_array($pkeys) && strpos($src->databaseType,'mssql') !== false) {
+ $pk = reset($pkeys);
+
+ #$sqla[] = "DROP TRIGGER $name";
+ $sqltr = "
+ TRIGGER $name
+ ON $srcTable /* for data replication and merge */
+ AFTER UPDATE
+ AS
+ UPDATE $srcTable
+ SET
+ $srcUpdateDateFld = case when I.$srcCopyFlagFld = $arrv2 or I.$srcCopyFlagFld = $arrv3 then I.$srcUpdateDateFld
+ else $sysdate end,
+ $srcCopyFlagFld = case
+ when I.$srcCopyFlagFld = $arrv2 then $arrv0
+ when I.$srcCopyFlagFld = $arrv3 then D.$srcCopyFlagFld
+ else $arrv1 end
+ FROM $srcTable S Join Inserted AS I on I.$pk = S.$pk
+ JOIN Deleted as D ON I.$pk = D.$pk
+ WHERE I.$srcCopyFlagFld = D.$srcCopyFlagFld or I.$srcCopyFlagFld = $arrv2
+ or I.$srcCopyFlagFld = $arrv3 or I.$srcCopyFlagFld is null
+ ";
+ $sqla[] = 'CREATE '.$sqltr; // first if does not exists
+ $sqla[] = 'ALTER '.$sqltr; // second if it already exists
+ } else if (strpos($src->databaseType,'oci') !== false) {
+
+ if (strlen($srcTable)>22) $tableidx = substr($srcTable,0,16).substr(crc32($srcTable),6);
+ else $tableidx = $srcTable;
+
+ $name = $tableidx.$this->trgSuffix;
+ $idx = $tableidx.$this->idxSuffix;
+ $sqla[] = "
+CREATE OR REPLACE TRIGGER $name /* for data replication and merge */
+BEFORE UPDATE ON $srcTable REFERENCING NEW AS NEW OLD AS OLD
+FOR EACH ROW
+BEGIN
+ if :new.$srcCopyFlagFld = $arrv2 then
+ :new.$srcCopyFlagFld := $arrv0;
+ elsif :new.$srcCopyFlagFld = $arrv3 then
+ :new.$srcCopyFlagFld := :old.$srcCopyFlagFld;
+ elsif :old.$srcCopyFlagFld = :new.$srcCopyFlagFld or :new.$srcCopyFlagFld is null then
+ if $this->trLogic then
+ :new.$srcUpdateDateFld := $sysdate;
+ :new.$srcCopyFlagFld := $arrv1;
+ end if;
+ end if;
+END;
+";
+ }
+ foreach($sqla as $sql) $src->Execute($sql);
+
+ if ($srcCopyFlagFld) $srcCopyFlagFld .= ', ';
+ $src->Execute("CREATE INDEX {$idx} on $srcTable ($srcCopyFlagFld$srcUpdateDateFld)");
+ }
+
+
+ /*
+ Perform Merge by copying all data modified from src to dest
+ then update src copied flag if present.
+
+ Returns array taken from ReplicateData:
+
+ Returns an array:
+ $arr[0] = true if no error, false if error
+ $arr[1] = number of recs processed
+ $arr[2] = number of successful inserts
+ $arr[3] = number of successful updates
+
+ $srcTable = src table
+ $dstTable = dest table
+ $pkeys = primary keys array. if empty, then only inserts will occur
+ $srcignoreflds = ignore these flds (must be upper cased)
+ $setsrc = updateSrcFn string
+ $srcUpdateDateFld = field in src with the last update date
+ $srcCopyFlagFld = false = optional field that holds the copied indicator
+ $flagvals=array('Y','N','P','=') = array of values indicating array(copied, not copied).
+ Null is assumed to mean not copied. The 3rd value 'P' indicates that we want to force 'Y', bypassing
+ default trigger behaviour to reset the COPIED='N' when the record is replicated from other side.
+ The last value '=' is don't change copyflag.
+ $srcCopyDateFld = field that holds last copy date in src table, which will be updated on Merge()
+ $dstCopyDateFld = field that holds last copy date in dst table, which will be updated on Merge()
+ $defaultDestRaiseErrorFn = The adodb raiseErrorFn handler. Default is to not raise an error.
+ Just output error message to stdout
+
+ */
+
+
+ function Merge($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
+ $srcUpdateDateFld,
+ $srcCopyFlagFld, $flagvals=array('Y','N','P','='),
+ $srcCopyDateFld = false,
+ $dstCopyDateFld = false,
+ $whereClauses = '',
+ $orderBy = '', # MUST INCLUDE THE "ORDER BY" suffix
+ $copyDoneFlagIdx = 3,
+ $defaultDestRaiseErrorFn = '')
+ {
+ $src = $this->connSrc;
+ $dest = $this->connDest;
+
+ $time = $src->Time();
+
+ $delfirst = $this->deleteFirst;
+ $upd = $this->updateSrcFn;
+
+ $this->deleteFirst = false;
+ //$this->updateFirst = true;
+
+ $srcignoreflds[] = $srcUpdateDateFld;
+ $srcignoreflds[] = $srcCopyFlagFld;
+ $srcignoreflds[] = $srcCopyDateFld;
+
+ if (empty($whereClauses)) $whereClauses = '1=1';
+ $where = " WHERE ($whereClauses) and ($srcCopyFlagFld = ".$src->qstr($flagvals[1]).')';
+ if ($orderBy) $where .= ' '.$orderBy;
+ else $where .= ' ORDER BY '.$srcUpdateDateFld;
+
+ if ($setsrc) $set[] = $setsrc;
+ else $set = array();
+
+ if ($srcCopyFlagFld) $set[] = "$srcCopyFlagFld = ".$src->qstr($flagvals[2]);
+ if ($srcCopyDateFld) $set[]= "$srcCopyDateFld = ".$src->sysTimeStamp;
+ if ($set) $this->updateSrcFn = array(implode(', ',$set));
+ else $this->updateSrcFn = '';
+
+
+ $extra[$srcCopyFlagFld] = array($dest->qstr($flagvals[0]),$dest->qstr($flagvals[$copyDoneFlagIdx]));
+
+ $saveraise = $dest->raiseErrorFn;
+ $dest->raiseErrorFn = '';
+
+ if ($this->compat && $this->compat == 1.0) $srcUpdateDateFld = '';
+ $arr = $this->ReplicateData($srcTable, $dstTable, $pkeys, $where, $srcignoreflds,
+ $dstCopyDateFld,$extra,$srcUpdateDateFld);
+
+ $dest->raiseErrorFn = $saveraise;
+
+ $this->updateSrcFn = $upd;
+ $this->deleteFirst = $delfirst;
+
+ return $arr;
+ }
+ /*
+ If doing a 2 way merge, then call
+ $rep->Merge()
+ to save without modifying the COPIEDFLAG ('=').
+
+ Then can the following to set the COPIEDFLAG to 'P' which forces the COPIEDFLAG = 'Y'
+ $rep->MergeDone()
+ */
+
+ function MergeDone($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
+ $srcUpdateDateFld,
+ $srcCopyFlagFld, $flagvals=array('Y','N','P','='),
+ $srcCopyDateFld = false,
+ $dstCopyDateFld = false,
+ $whereClauses = '',
+ $orderBy = '', # MUST INCLUDE THE "ORDER BY" suffix
+ $copyDoneFlagIdx = 2,
+ $defaultDestRaiseErrorFn = '')
+ {
+ return $this->Merge($srcTable, $dstTable, $pkeys, $srcignoreflds, $setsrc,
+ $srcUpdateDateFld,
+ $srcCopyFlagFld, $flagvals,
+ $srcCopyDateFld,
+ $dstCopyDateFld,
+ $whereClauses,
+ $orderBy, # MUST INCLUDE THE "ORDER BY" suffix
+ $copyDoneFlagIdx,
+ $defaultDestRaiseErrorFn);
+ }
+
+ function _doerr($reason, $selflds)
+ {
+ $fn = $this->errHandler;
+ if ($fn) $fn($this, $reason, $selflds); // set $this->neverAbort to true or false as required inside $fn
+ }
+}