我們現(xiàn)在的物聯(lián)網(wǎng)環(huán)境下,有部分?jǐn)?shù)據(jù),采樣頻率為2000條記錄/分鐘,這樣下來(lái)一天24*60*2000=2880000約等于300萬(wàn)條數(shù)據(jù),以后必然還會(huì)增加。之前數(shù)據(jù)庫(kù)使用的是mssql,對(duì)于數(shù)據(jù)庫(kù)的壓力很大,同時(shí)又需要保證歷史查詢的響應(yīng)速度,這種情況下,在單表中數(shù)據(jù)量大,同時(shí)存在讀寫(xiě)操作。不得已采用MongoDB來(lái)存儲(chǔ)數(shù)據(jù)。如果使用MongoDB,則至少需要三臺(tái)機(jī)器,兩臺(tái)實(shí)現(xiàn)讀寫(xiě)分離,一臺(tái)作為仲裁(當(dāng)然條件不允許也可以不用),每臺(tái)機(jī)器的內(nèi)存暫時(shí)配置在16G,公司小,沒(méi)辦法,據(jù)說(shuō),使用這個(gè)MongoDB需要機(jī)器內(nèi)存最少92G,我沒(méi)有驗(yàn)證過(guò),但是吃內(nèi)存是公認(rèn)的,所以內(nèi)存絕對(duì)要保證,就算保證了,也不一定完全就沒(méi)有意外發(fā)生。我們上面的這些特殊的數(shù)據(jù)是允許少量的丟失的,這些只是做分析使用的,幾個(gè)月了,暫時(shí)還沒(méi)出現(xiàn)數(shù)據(jù)丟失的情況,可能最新版本早就修復(fù)了吧,新手使用建議多看下官網(wǎng)上的說(shuō)明。下面直接奔入主題:
以上是詳細(xì)的配置參數(shù),其中路徑部分根據(jù)需要更改, 這里設(shè)置的oplogsize大小為10G,根據(jù)業(yè)務(wù)場(chǎng)景進(jìn)行調(diào)整,另外auth權(quán)限為null,因?yàn)樵O(shè)置權(quán)限會(huì)增加服務(wù)開(kāi)銷(xiāo),影響效率,最下面幾行是內(nèi)存引擎,可以控制副本集同步及內(nèi)存限制,防止內(nèi)存泄露。
此時(shí)副本集集群配置已經(jīng)完成,然后在命令行中輸入:rs.status(),查看副本集狀態(tài),需要查看同步情況,可以輸入命令:db.serverStatus().
Note:內(nèi)存持續(xù)上升,內(nèi)部沒(méi)有內(nèi)存回收機(jī)制,若限制內(nèi)存 ,則可能出現(xiàn)查詢速度變慢,數(shù)據(jù)丟失等問(wèn)題,建議優(yōu)化查詢效率,建立索引
using MongoDB.Driver;
using MongoDB.Bson;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver.Linq;
using System.Configuration;
using System.IO;
using UnitoonIot.AppSetting;
namespace UnitoonIot.Mongo
{
public class MongoDb
{
private static string ConnectionStringHost ;
private static string UserName ;
private static string Password;
private static IMongoDatabase _db = null;
private static readonly object LockHelper = new object();
/// summary>
/// mongodb初始化
/// /summary>
public static void Init()
{
ConnectionStringHost = "10.1.5.24:27016,10.1.5.24:27016,10.1.5.26:27017";
//AppSettings.GetConfigValue("MongoHost");//"10.1.5.24:27016";
UserName = AppSettings.GetConfigValue("MongoUserName");
Password = AppSettings.GetConfigValue("MongoPwd");
}
static MongoDb()
{
}
public static IMongoDatabase GetDb(string dbName,string options=null)
{
if (_db != null) return _db;
lock (LockHelper)
{
if (_db != null) return _db;
var database = dbName;
var userName = UserName;
var password = Password;
var authentication = string.Empty;
var host = string.Empty;
if (!string.IsNullOrWhiteSpace(userName))
{
authentication = string.Concat(userName, ':', password, '@');
}
if (!string.IsNullOrEmpty(options) !options.StartsWith("?"))
{
options = string.Concat('?', options);
}
host = string.IsNullOrEmpty(ConnectionStringHost) ? "localhost" : ConnectionStringHost;
database = database ?? "testdb";
//mongodb://[username:password@]host1[:port1][,host2[:port2],…[,hostN[:portN]]][/[database][?options]]
var conString = options!=null? $"mongodb://{authentication}{host}/{database}{options}"
: $"mongodb://{authentication}{host}/{database}";
var url = new MongoUrl(conString);
var mcs = MongoClientSettings.FromUrl(url);
mcs.MaxConnectionLifeTime = TimeSpan.FromMilliseconds(1000);
var client = new MongoClient(mcs);
_db = client.GetDatabase(url.DatabaseName);
}
return _db;
}
}
/// summary>
/// MongoDb 數(shù)據(jù)庫(kù)操作類
/// /summary>
public class MongoRepositoryT>: IMongoRepositoryT> where T : BaseEntity
{
#region readonly field
/// summary>
/// 表名
/// /summary>
private readonly IMongoCollectionT> _collection = null;
/// summary>
/// 數(shù)據(jù)庫(kù)對(duì)象
/// /summary>
private readonly IMongoDatabase _database;
#endregion
/// summary>
/// 構(gòu)造函數(shù)
/// /summary>
public MongoRepository()
{
this._database = MongoDb.GetDb(Activator.CreateInstanceT>().DbName, "readPreference =secondaryPreferred ");//primaryPreferred/secondaryPreferred/nearest
_collection = _database.GetCollectionT>(typeof(T).Name);
}
#region 增加
/// summary>
/// 插入對(duì)象
/// /summary>
/// param name="t">插入的對(duì)象/param>
public virtual T Insert(T t)
{
// var flag = ObjectId.GenerateNewId();
// t.GetType().GetProperty("Id").SetValue(t, flag);
//t.Time = DateTime.Now;
_collection.InsertOne(t);
return t;
}
/// summary>
/// 批量插入
/// /summary>
/// param name="ts">要插入的對(duì)象集合/param>
public virtual IEnumerableT> InsertBatch(IEnumerableT> ts)
{
_collection.InsertMany(ts);
return ts;
}
/// summary>
/// 插入對(duì)象
/// /summary>
/// param name="t">插入的對(duì)象/param>
public virtual void InsertAsync(T t)
{
//var flag = ObjectId.GenerateNewId();
// t.GetType().GetProperty("Id").SetValue(t, flag);
// t.Time = DateTime.Now;
_collection.InsertOneAsync(t);
}
/// summary>
/// 批量插入
/// /summary>
/// param name="ts">要插入的對(duì)象集合/param>
public virtual void InsertBatchAsync(IEnumerableT> ts)
{
_collection.InsertManyAsync(ts);
}
#endregion
#region 刪除
/// summary>
/// 刪除
/// /summary>
/// returns>/returns>
public virtual long Delete(T t)
{
var filter = BuildersT>.Filter.Eq("Id", t.Id);
var result = _collection.DeleteOne(filter);
return result.DeletedCount;
}
/// summary>
/// 刪除
/// /summary>
/// returns>/returns>
public virtual void DeleteAsync(T t)
{
var filter = BuildersT>.Filter.Eq("Id", t.Id);
_collection.DeleteOneAsync(filter);
}
/// summary>
/// 按條件表達(dá)式刪除
/// /summary>
/// param name="predicate">條件表達(dá)式/param>
/// returns>/returns>
public virtual long Delete(ExpressionFuncT, bool>> predicate)
{
var result = _collection.DeleteOne(predicate);
return result.DeletedCount;
}
/// summary>
/// 按條件表達(dá)式刪除
/// /summary>
/// param name="predicate">條件表達(dá)式/param>
/// returns>/returns>
public virtual void DeleteAsync(ExpressionFuncT, bool>> predicate)
{
_collection.DeleteOneAsync(predicate);
}
/// summary>
/// 按條件表達(dá)式批量刪除
/// /summary>
/// param name="predicate">條件表達(dá)式/param>
/// returns>/returns>
public virtual long DeleteBatch(ExpressionFuncT, bool>> predicate)
{
var result = _collection.DeleteMany(predicate);
return result.DeletedCount;
}
/// summary>
/// 按條件表達(dá)式批量刪除
/// /summary>
/// param name="predicate">條件表達(dá)式/param>
/// returns>/returns>
public virtual void DeleteBatchAsync(ExpressionFuncT, bool>> predicate)
{
_collection.DeleteManyAsync(predicate);
}
/// summary>
/// 按檢索條件刪除
/// 建議用BuildersT>構(gòu)建復(fù)雜的查詢條件
/// /summary>
/// param name="filter">條件/param>
/// returns>/returns>
public virtual long Delete(FilterDefinitionT> filter)
{
var result = _collection.DeleteOne(filter);
return result.DeletedCount;
}
/// summary>
/// 按檢索條件刪除
/// 建議用BuildersT>構(gòu)建復(fù)雜的查詢條件
/// /summary>
/// param name="filter">條件/param>
/// returns>/returns>
public virtual void DeleteAsync(FilterDefinitionT> filter)
{
_collection.DeleteOneAsync(filter);
}
#endregion
#region 修改
/// summary>
/// 修改(Id不變)
/// /summary>
/// returns>/returns>
public virtual long Update(T t)
{
var filterBuilder = BuildersT>.Filter;
var filter = filterBuilder.Eq("Id",t.Id);
var update = _collection.ReplaceOne(filter, t, new UpdateOptions() { IsUpsert = true });
return update.ModifiedCount;
}
/// summary>
/// 修改(Id不變)
/// /summary>
/// returns>/returns>
public virtual void UpdateAsync(T t)
{
var filterBuilder = BuildersT>.Filter;
var filter = filterBuilder.Eq("Id", t.Id);
_collection.ReplaceOneAsync(filter, t, new UpdateOptions() { IsUpsert = true });
}
/// summary>
/// 用新對(duì)象替換新文檔
/// /summary>
/// param name="filter">查詢條件/param>
/// param name="t">對(duì)象/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual long Update(ExpressionFuncT, bool>> filter, T t)
{
var update = _collection.ReplaceOne(filter, t, new UpdateOptions() { IsUpsert = true });
return update.ModifiedCount;
}
/// summary>
/// 用新對(duì)象替換新文檔
/// /summary>
/// param name="filter">查詢條件/param>
/// param name="t">對(duì)象/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual long Update(FilterDefinitionT> filter, T t)
{
var update = _collection.ReplaceOne(filter, t, new UpdateOptions() { IsUpsert = true });
return update.ModifiedCount;
}
/// summary>
/// 用新對(duì)象替換新文檔
/// /summary>
/// param name="filter">查詢條件/param>
/// param name="t">對(duì)象/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual void UpdateAsync(ExpressionFuncT, bool>> filter, T t)
{
_collection.ReplaceOneAsync(filter, t, new UpdateOptions() { IsUpsert = true });
}
/// summary>
/// 用新對(duì)象替換新文檔
/// /summary>
/// param name="filter">查詢條件/param>
/// param name="t">對(duì)象/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual void UpdateAsync(FilterDefinitionT> filter, T t)
{
_collection.ReplaceOneAsync(filter, t, new UpdateOptions() { IsUpsert = true });
}
/// summary>
/// 根據(jù)Id和條件文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="id">對(duì)象Id/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual long Update(string id, UpdateDefinitionT> update)
{
var filterBuilder = BuildersT>.Filter;
var filter = filterBuilder.Eq("Id", new ObjectId(id));
var result = _collection.UpdateOne(filter, update, new UpdateOptions() { IsUpsert = true });
return result.ModifiedCount;
}
/// summary>
/// 根據(jù)Id和條件文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="id">對(duì)象Id/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual void UpdateAsync(string id, UpdateDefinitionT> update)
{
var filterBuilder = BuildersT>.Filter;
var filter = filterBuilder.Eq("Id", new ObjectId(id));
_collection.UpdateOneAsync(filter, update, new UpdateOptions() { IsUpsert = true });
}
/// summary>
/// 根據(jù)條件修改文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="filter">查詢條件Builders/T/>.Filter.Eq(filed, value)/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual void Update(UpdateDefinitionT> update,ExpressionFuncT, bool>> filter)
{
_collection.UpdateOne(filter, update, new UpdateOptions() { IsUpsert = true });
}
/// summary>
/// 根據(jù)條件修改文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="filter">查詢條件Builders/T/>.Filter.Eq(filed, value)/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual long Update(UpdateDefinitionT> update, FilterDefinitionT> filter)
{
var result = _collection.UpdateOne(filter, update, new UpdateOptions() { IsUpsert = true });
return result.ModifiedCount;
}
/// summary>
/// 根據(jù)條件修改文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="filter">查詢條件Builders/T/>.Filter.Eq(filed, value)/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual void UpdateAsync(UpdateDefinitionT> update, ExpressionFuncT, bool>> filter)
{
_collection.UpdateOneAsync(filter, update, new UpdateOptions() { IsUpsert = true });
}
/// summary>
/// 根據(jù)條件修改文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="filter">查詢條件Builders/T/>.Filter.Eq(filed, value)/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual void UpdateAsync(UpdateDefinitionT> update, FilterDefinitionT> filter)
{
_collection.UpdateOneAsync(filter, update, new UpdateOptions() { IsUpsert = true });
}
/// summary>
/// 根據(jù)條件批量修改文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="filter">查詢條件Builders/T/>.Filter.Eq(filed, value)/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual long UpdateBatch(UpdateDefinitionT> update, ExpressionFuncT, bool>> filter)
{
var result = _collection.UpdateMany(filter, update, new UpdateOptions() { IsUpsert = true });
return result.ModifiedCount;
}
/// summary>
/// 根據(jù)條件批量修改文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="filter">查詢條件Builders/T/>.Filter.Eq(filed, value)/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual long UpdateBatch(UpdateDefinitionT> update, FilterDefinitionT> filter)
{
var result = _collection.UpdateMany(filter, update, new UpdateOptions() { IsUpsert = true });
return result.ModifiedCount;
}
/// summary>
/// 根據(jù)條件批量修改文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="filter">查詢條件Builders/T/>.Filter.Eq(filed, value)/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual void UpdateBatchAsync(UpdateDefinitionT> update, ExpressionFuncT, bool>> filter)
{
_collection.UpdateManyAsync(filter, update, new UpdateOptions() { IsUpsert = true });
}
/// summary>
/// 根據(jù)條件批量修改文檔
/// /summary>
/// param name="update">修改條件-形如:Builders/T/>.Update.Set(filed, value)/param>
/// param name="filter">查詢條件Builders/T/>.Filter.Eq(filed, value)/param>
/// returns>修改影響文檔數(shù)/returns>
public virtual void UpdateBatchAsync(UpdateDefinitionT> update, FilterDefinitionT> filter)
{
_collection.UpdateManyAsync(filter, update, new UpdateOptions() { IsUpsert = true });
}
#endregion
#region 查詢
#region GetCollection
/// summary>
/// 獲取操作對(duì)象的IMongoCollection集合,強(qiáng)類型對(duì)象集合
/// /summary>
/// returns>/returns>
public virtual IMongoCollectionT> GetCollection()
{
return _database.GetCollectionT>(typeof(T).Name);
}
#endregion
#region GetSingle
/// summary>
/// 查詢數(shù)據(jù)庫(kù),檢查是否存在指定ID的對(duì)象
/// /summary>
/// param name="id">對(duì)象的ID值/param>
/// returns>存在則返回指定的對(duì)象,否則返回Null/returns>
public virtual T GetById(string id)
{
var filterBuilder = BuildersT>.Filter;
var filter = filterBuilder.Eq("Id", new ObjectId(id));
var data = _collection.Find(filter).FirstOrDefault();
return data;
}
/// summary>
/// 查詢數(shù)據(jù)庫(kù),檢查是否存在指定ID的對(duì)象
/// /summary>
/// param name="id">對(duì)象的ID值/param>
/// returns>存在則返回指定的對(duì)象,否則返回Null/returns>
public virtual async TaskT> GetAsyncById(string id)
{
var filterBuilder = BuildersT>.Filter;
var filter = filterBuilder.Eq("Id", new ObjectId(id));
var data = await _collection.FindAsync(filter);
return await data.SingleOrDefaultAsync();
}
/// summary>
/// 查詢數(shù)據(jù)
/// /summary>
/// param name="filter">過(guò)濾條件/param>
/// returns>/returns>
public virtual T Get(FilterDefinitionT> filter)
{
return _collection.Find(filter).FirstOrDefault();
}
/// summary>
/// 查詢數(shù)據(jù)
/// /summary>
/// param name="filter">條件表達(dá)式/param>
/// returns>/returns>
public virtual T Get(ExpressionFuncT,bool>> filter)
{
return _collection.Find(filter).FirstOrDefault();
}
/// summary>
/// 查詢數(shù)據(jù)
/// /summary>
/// param name="filter">過(guò)濾條件/param>
/// returns>/returns>
public virtual async TaskT> GetAsync(FilterDefinitionT> filter)
{
var data = await _collection.FindAsync(filter);
return await data.SingleOrDefaultAsync();
}
/// summary>
/// 查詢數(shù)據(jù)
/// /summary>
/// param name="filter">條件表達(dá)式/param>
/// returns>/returns>
public virtual async TaskT> GetAsync(ExpressionFuncT, bool>> filter)
{
var data = await _collection.FindAsync(filter);
return await data.SingleOrDefaultAsync();
}
#endregion
#region GetMany
/// summary>
/// 查詢部分?jǐn)?shù)據(jù)
/// /summary>
/// param name="filter">過(guò)濾條件/param>
/// returns>/returns>
public virtual IEnumerableT> GetMany(FilterDefinitionT> filter)
{
return _collection.Find(filter).ToEnumerable();
}
/// summary>
/// 查詢部分?jǐn)?shù)據(jù)
/// /summary>
/// param name="filter">條件表達(dá)式/param>
/// returns>/returns>
public virtual IEnumerableT> GetMany(ExpressionFuncT,bool>> filter)
{
//return _collection.AsQueryable().Where(filter).ToList();
//return _collection.AsQueryable().Where(filter);
return _collection.Find(filter).ToEnumerable(); //.ToEnumerable();
}
/// summary>
/// 查詢部分?jǐn)?shù)據(jù)
/// /summary>
/// param name="filter">過(guò)濾條件/param>
/// returns>/returns>
public virtual async TaskIEnumerableT>> GetManyAsync(FilterDefinitionT> filter)
{
var data = await _collection.FindAsync(filter);
return await data.ToListAsync();
}
/// summary>
/// 查詢部分?jǐn)?shù)據(jù)
/// /summary>
/// param name="filter">過(guò)濾條件/param>
/// returns>/returns>
public virtual async TaskIEnumerableT>> GetManyAsync(ExpressionFuncT, bool>> filter)
{
var data = await _collection.FindAsync(filter);
return await data.ToListAsync();
}
#endregion
#region GetAll
/// summary>
/// 查詢所有記錄,復(fù)雜查詢直接用Linq處理(避免全表掃描)
/// /summary>
/// returns>要查詢的對(duì)象/returns>
public virtual IEnumerableT> GetAll()
{
var data = _collection.AsQueryable();
return data.ToEnumerable();
}
/// summary>
/// 查詢所有記錄,復(fù)雜查詢直接用Linq處理(避免全表掃描)
/// /summary>
/// returns>要查詢的對(duì)象/returns>
public virtual async TaskIEnumerableT>> GetAllAsync()
{
var data = _collection.AsQueryable();
return await data.ToListAsync();
}
/// summary>
/// 查詢所有記錄,復(fù)雜查詢直接用Linq處理(避免全表掃描)
/// /summary>
/// returns>要查詢的對(duì)象/returns>
public virtual IQueryableT> GetAllQueryable()
{
return _collection.AsQueryable();
}
#endregion
#region MapReduce
/// summary>
/// MapReduce
/// /summary>
/// returns>返回一個(gè)List列表數(shù)據(jù)/returns>
public IEnumerableT> GetMap(BsonJavaScript map,BsonJavaScript reduce)
{
return _collection.MapReduceT>(map,reduce).ToList();
}
#endregion
#endregion
}
}