在現代應用程序中,及時更新不同數據庫之間的數據至關重要。本文將介紹如何在 SQL Server 中使用 C# 實現數據的實時同步。我們將使用 SQLDependency
類來監聽數據庫表的變化,并將這些變化實時地同步到另一張表中。
前提條件
在開始之前,請確保已經設置好兩個 SQL Server 數據庫:
SourceDB
: 包含你需要監聽的表。
TargetDB
: 目標數據庫,用于同步數據。
配置 SQL Server
首先,需要啟用 SQL Server 的查詢通知服務,以便支持 SQLDependency
。請使用以下命令啟用數據庫服務代理:
查看
SELECT name, is_broker_enabled
FROM sys.databases;
ALTER DATABASE SourceDB SET ENABLE_BROKER;
編寫 C# 程序
下面的 C# 程序將使用 SQLDependency
來監聽 SourceDB
中的 SourceTable
表的變化。我們將在數據插入時同步到 TargetDB
中的 TargetTable
。
程序代碼
using System;
using System.Data;
using System.Data.SqlClient;
using System.Configuration;
class Program
{
private static bool _continueRunning = true;
static void Main()
{
Console.WriteLine("數據同步程序已啟動。按 'Q' 鍵退出。");
// 設置連接字符串
string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;
// 啟用 SQLDependency
SqlDependency.Start(sourceConnectionString);
try
{
while (_continueRunning)
{
try
{
using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
{
sourceConnection.Open();
StartListening(sourceConnection);
// 保持連接打開狀態
while (_continueRunning)
{
if (Console.KeyAvailable)
{
var key = Console.ReadKey(true).Key;
if (key == ConsoleKey.Q)
{
_continueRunning = false;
break;
}
}
Thread.Sleep(100);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"發生錯誤: {ex.Message}");
Console.WriteLine("5秒后重試...");
Thread.Sleep(5000);
}
}
}
finally
{
SqlDependency.Stop(sourceConnectionString);
Console.WriteLine("數據同步程序已停止。");
}
}
private static void StartListening(SqlConnection connection)
{
using (SqlCommand command = new SqlCommand("SELECT ID, Name, Value, Created_Time FROM dbo.t1", connection))
{
SqlDependency dependency = new SqlDependency(command);
dependency.OnChange += new OnChangeEventHandler(OnDataChange);
using (SqlDataReader reader = command.ExecuteReader())
{
// 初次加載數據處理
}
}
}
private static void OnDataChange(object sender, SqlNotificationEventArgs e)
{
if (e.Info == SqlNotificationInfo.Insert)
{
Console.WriteLine("數據已插入。事件類型: " + e.Info.ToString());
SyncData();
}
// 重新啟用監聽
string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
{
sourceConnection.Open();
StartListening(sourceConnection);
}
}
private static void SyncData()
{
string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;
using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
using (SqlConnection targetConnection = new SqlConnection(targetConnectionString))
{
sourceConnection.Open();
targetConnection.Open();
// 獲取最新插入的數據
SqlCommand fetchDataCommand = new SqlCommand("SELECT TOP 1 * FROM t1 ORDER BY Created_Time DESC", sourceConnection);
using (SqlDataReader dataReader = fetchDataCommand.ExecuteReader())
{
if (dataReader.Read())
{
Guid id = (Guid)dataReader["ID"];
string name = (string)dataReader["Name"];
decimal value = (decimal)dataReader["Value"];
DateTime created_time = (DateTime)dataReader["created_time"];
// 將數據插入到 TargetTable
SqlCommand insertCommand = new SqlCommand("INSERT INTO t1 (ID, Name, Value,Created_Time) VALUES (@ID, @Name, @Value,@Created_Time)", targetConnection);
insertCommand.Parameters.AddWithValue("@ID", id);
insertCommand.Parameters.AddWithValue("@Name", name);
insertCommand.Parameters.AddWithValue("@Value", value);
insertCommand.Parameters.AddWithValue("@Created_Time", created_time);
insertCommand.ExecuteNonQuery();
}
}
}
}
}
增加更新后同步
private static void SyncUpdatedData()
{
string sourceConnectionString = ConfigurationManager.ConnectionStrings["SourceDB"].ConnectionString;
string targetConnectionString = ConfigurationManager.ConnectionStrings["TargetDB"].ConnectionString;
using (SqlConnection sourceConnection = new SqlConnection(sourceConnectionString))
using (SqlConnection targetConnection = new SqlConnection(targetConnectionString))
{
sourceConnection.Open();
targetConnection.Open();
// 獲取最近更新的數據
// 注意:這里假設你有一個 Last_Updated_Time 字段來跟蹤更新時間
SqlCommand fetchDataCommand = new SqlCommand("SELECT TOP 1 * FROM t1 ORDER BY Last_Updated_Time DESC", sourceConnection);
using (SqlDataReader dataReader = fetchDataCommand.ExecuteReader())
{
if (dataReader.Read())
{
Guid id = (Guid)dataReader["ID"];
string name = (string)dataReader["Name"];
decimal value = (decimal)dataReader["Value"];
DateTime last_updated_time = (DateTime)dataReader["Last_Updated_Time"];
// 更新目標表中的數據
SqlCommand updateCommand = new SqlCommand(
"UPDATE t1 SET Name = @Name, Value = @Value, Last_Updated_Time = @Last_Updated_Time WHERE ID = @ID",
targetConnection);
updateCommand.Parameters.AddWithValue("@ID", id);
updateCommand.Parameters.AddWithValue("@Name", name);
updateCommand.Parameters.AddWithValue("@Value", value);
updateCommand.Parameters.AddWithValue("@Last_Updated_Time", last_updated_time);
int rowsAffected = updateCommand.ExecuteNonQuery();
if (rowsAffected > 0)
{
Console.WriteLine($"已同步更新的數據: ID={id}, Name={name}, Value={value}, Created_Time={last_updated_time}");
}
else
{
Console.WriteLine($"未找到要更新的記錄: ID={id}");
}
}
}
}
}
配置文件 (App.config
)
確保在你的項目中包含一個配置文件來管理數據庫連接字符串。
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<connectionStrings>
<add name="SourceDB" connectionString="Data Source=your_source_server;Initial Catalog=SourceDB;Integrated Security=True" />
<add name="TargetDB" connectionString="Data Source=your_target_server;Initial Catalog=TargetDB;Integrated Security=True" />
</connectionStrings>
</configuration>
關鍵點說明
注意事項
確保在 SQL Server 上啟用查詢通知和服務代理。
SQLDependency
適用于簡單查詢,不能包括復雜查詢、聯接或聚合。
如果項目對性能和實時性要求較高,建議結合其他工具或技術方案,如 Change Tracking
或 Change Data Capture
等。
通過以上步驟,你可以實現對 SQL 數據庫變化的實時監聽和數據同步,從而保持數據庫之間的數據一致性和實時性。
該文章在 2024/10/30 15:06:16 編輯過