MySQL · 引擎介紹 · Sphinx源碼剖析(三)
在本節中我會介紹Sphinx在構建索引之前做的一些事情,主要是從mysql拉取數據保存,然後分詞排序保存到內存等等一係列的操作。下麵是幾個相關指令
sql_query = \
SELECT id, group_id, UNIX_TIMESTAMP(date_added) AS date_added, \
title, content \
FROM documents
sql_query_range = SELECT MIN(id),MAX(id) FROM documents
sql_range_step = 1000
其中sql_query是sphinx每次從mysql拉取數據的sql,而sql_query_range則是取得需要從mysql拉取的數據條目,而sql_rang_step則是表示每次從mysql拉取多少數據。sql_rang_range執行分兩種情況,第一種是第一次拉取數據的時候,第二種是當當前的range數據讀取完畢之後。
首先來看CSphSource_SQL::NextDocument函數,這個函數的主要作用是從mysql讀取數據然後切分保存,首先我們來看讀取數據這一部分,這裏步驟很簡單,就是執行對應的sql,然後判斷當前range的數據是否讀取完畢,如果讀取完畢則繼續執行sql_query_rang(RunQueryStep)。這裏要注意的是,sphinx讀取數據是一條一條的讀取然後執行的.
do
{
// try to get next row
bool bGotRow = SqlFetchRow ();
// when the party's over...
while ( !bGotRow )
{
// is that an error?
if ( SqlIsError() )
{
sError.SetSprintf ( "sql_fetch_row: %s", SqlError() );
m_tDocInfo.m_uDocID = 1; // 0 means legal eof
return NULL;
}
// maybe we can do next step yet?
if ( !RunQueryStep ( m_tParams.m_sQuery.cstr(), sError ) )
{
// if there's a message, there's an error
// otherwise, we're just over
if ( !sError.IsEmpty() )
{
m_tDocInfo.m_uDocID = 1; // 0 means legal eof
return NULL;
}
} else
{
// step went fine; try to fetch
bGotRow = SqlFetchRow ();
continue;
}
SqlDismissResult ();
// ok, we're over
ARRAY_FOREACH ( i, m_tParams.m_dQueryPost )
{
if ( !SqlQuery ( m_tParams.m_dQueryPost[i].cstr() ) )
{
sphWarn ( "sql_query_post[%d]: error=%s, query=%s",
i, SqlError(), m_tParams.m_dQueryPost[i].cstr() );
break;
}
SqlDismissResult ();
}
m_tDocInfo.m_uDocID = 0; // 0 means legal eof
return NULL;
}
// get him!
m_tDocInfo.m_uDocID = VerifyID ( sphToDocid ( SqlColumn(0) ) );
m_uMaxFetchedID = Max ( m_uMaxFetchedID, m_tDocInfo.m_uDocID );
} while ( !m_tDocInfo.m_uDocID );
上麵的代碼我們可以看到一個很關鍵的字段m_uDocID,這個字段表示當前doc的id(因此數據庫的表設計必須有這個id字段).
讀取完畢數據之後,開始處理讀取的數據,這裏會按照字段來切分,主要是將對應的數據庫字段保存到索引fielld
// split columns into fields and attrs
for ( int i=0; i<m_iPlainFieldsLength; i++ )
{
// get that field
#if USE_ZLIB
if ( m_dUnpack[i]!=SPH_UNPACK_NONE )
{
DWORD uUnpackedLen = 0;
m_dFields[i] = (BYTE*) SqlUnpackColumn ( i, uUnpackedLen, m_dUnpack[i] );
m_dFieldLengths[i] = (int)uUnpackedLen;
continue;
}
#endif
m_dFields[i] = (BYTE*) SqlColumn ( m_tSchema.m_dFields[i].m_iIndex );
m_dFieldLengths[i] = SqlColumnLength ( m_tSchema.m_dFields[i].m_iIndex );
}
緊接著就是處理attribute,後續我們會詳細介紹attribute,現在我們隻需要知道它是一個類似二級索引的東西(不進入全文索引).
switch ( tAttr.m_eAttrType )
{
case SPH_ATTR_STRING:
case SPH_ATTR_JSON:
// memorize string, fixup NULLs
m_dStrAttrs[i] = SqlColumn ( tAttr.m_iIndex );
if ( !m_dStrAttrs[i].cstr() )
m_dStrAttrs[i] = "";
m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 );
break;
..................................
default:
// just store as uint by default
m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToDword ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe?
break;
}
然後我們來看Sphinx如何處理得到的數據,核心代碼在 RtIndex_t::AddDocument中,這個函數主要是用來分詞(IterateHits中)然後保存數據到對應的數據結構,而核心的數據結構是RtAccum_t,也就是最終sphinx在寫索引到文件之前,會將數據保存到這個數據結構,這裏要注意一般來說sphinx會保存很多數據,然後最後一次性提交給索引引擎來處理.而索引引擎中處理的就是這個數據結構.因此最終會調用RtAccum_t::AddDocument.
這裏需要注意兩個地方,第一個是m_dAccum這個域,這個域是一個vector,而這個vector裏麵保存了CSphWordHit這個結構,我們來看這個結構的定義
struct CSphWordHit
{
SphDocID_t m_uDocID; ///< document ID
SphWordID_t m_uWordID; ///< word ID in current dictionary
Hitpos_t m_uWordPos; ///< word position in current document
};
可以看到其實這個結構也就是保存了對應分詞的信息.
然後我們來看核心代碼,這裏主要是便利剛才從mysql得到的數據,去重然後保存數據.
int iHits = 0;
if ( pHits && pHits->Length() )
{
CSphWordHit tLastHit;
tLastHit.m_uDocID = 0;
tLastHit.m_uWordID = 0;
tLastHit.m_uWordPos = 0;
iHits = pHits->Length();
m_dAccum.Reserve ( m_dAccum.GetLength()+iHits );
for ( const CSphWordHit * pHit = pHits->First(); pHit<=pHits->Last(); pHit++ )
{
// ignore duplicate hits
if ( pHit->m_uDocID==tLastHit.m_uDocID && pHit->m_uWordID==tLastHit.m_uWordID && pHit->m_uWordPos==tLastHit.m_uWordPos )
continue;
// update field lengths
if ( pFieldLens && HITMAN::GetField ( pHit->m_uWordPos )!=HITMAN::GetField ( tLastHit.m_uWordPos ) )
pFieldLens [ HITMAN::GetField ( tLastHit.m_uWordPos ) ] = HITMAN::GetPos ( tLastHit.m_uWordPos );
// accumulate
m_dAccum.Add ( *pHit );
tLastHit = *pHit;
}
if ( pFieldLens )
pFieldLens [ HITMAN::GetField ( tLastHit.m_uWordPos ) ] = HITMAN::GetPos ( tLastHit.m_uWordPos );
}
做完上麵這些事情之後,就需要提交數據給索引處理引擎了,這裏核心的代碼都是在RtIndex_t::Commit中.
這個函數主要做兩個事情,第一個提取出前麵我們構造好的RtAccum_t,然後對於所有的doc進行排序,創建segment,也就是對應的索引塊(ram chunk),最後調用CommitReplayable來提交ram chunk到磁盤.
其實可以這麼理解,保存在內存中的索引也就是segment,然後當內存的大小到達限製後就會刷新內存中的索引到磁盤.
void RtIndex_t::Commit ( int * pDeleted, ISphRtAccum * pAccExt )
{
assert ( g_bRTChangesAllowed );
MEMORY ( MEM_INDEX_RT );
RtAccum_t * pAcc = AcquireAccum ( NULL, pAccExt, true );
if ( !pAcc )
return;
...................................
pAcc->Sort();
RtSegment_t * pNewSeg = pAcc->CreateSegment ( m_tSchema.GetRowSize(), m_iWordsCheckpoint );
.............................................
// now on to the stuff that needs locking and recovery
CommitReplayable ( pNewSeg, pAcc->m_dAccumKlist, pDeleted );
......................................
}
然後我們來看RtAccum_t::CreateSegment函數,這個函數用來將分詞好的數據保存到ram chunk,這裏需要注意兩個數據結構分別是RtDoc_t和RtWord_t,這兩個數據結構分別表示doc信息和分詞信息.
結構很簡單,後麵的注釋都很詳細
template < typename DOCID = SphDocID_t >
struct RtDoc_T
{
DOCID m_uDocID; ///< my document id
DWORD m_uDocFields; ///< fields mask
DWORD m_uHits; ///< hit count
DWORD m_uHit; ///< either index into segment hits, or the only hit itself (if hit count is 1)
};
template < typename WORDID=SphWordID_t >
struct RtWord_T
{
union
{
WORDID m_uWordID; ///< my keyword id
const BYTE * m_sWord;
};
DWORD m_uDocs; ///< document count (for stats and/or BM25)
DWORD m_uHits; ///< hit count (for stats and/or BM25)
DWORD m_uDoc; ///< index into segment docs
};
然後來看代碼,首先是初始化對應的寫結構,可以看到都是會寫到我們創建好的segment中.
RtDocWriter_t tOutDoc ( pSeg );
RtWordWriter_t tOutWord ( pSeg, m_bKeywordDict, iWordsCheckpoint );
RtHitWriter_t tOutHit ( pSeg );
然後就是寫數據了,這裏主要是做一個聚合,也就是將相同的keyword對應的屬性聚合起來.
ARRAY_FOREACH ( i, m_dAccum )
{
.......................................
// new keyword; flush current keyword
if ( tHit.m_uWordID!=tWord.m_uWordID )
{
tOutDoc.ZipRestart ();
if ( tWord.m_uWordID )
{
if ( m_bKeywordDict )
{
const BYTE * pPackedWord = pPacketBase + tWord.m_uWordID;
assert ( pPackedWord[0] && pPackedWord[0]+1<m_pDictRt->GetPackedLen() );
tWord.m_sWord = pPackedWord;
}
tOutWord.ZipWord ( tWord );
}
tWord.m_uWordID = tHit.m_uWordID;
tWord.m_uDocs = 0;
tWord.m_uHits = 0;
tWord.m_uDoc = tOutDoc.ZipDocPtr();
uPrevHit = EMPTY_HIT;
}
..................
}
這次就分析到這裏,下次我們將會分析最核心的部分就是Sphinx如何刷新數據到磁盤.
最後更新:2017-10-21 09:04:25