閱讀846 返回首頁    go 阿裏雲 go 技術社區[雲棲]


MySQL · 引擎介紹 · Sphinx源碼剖析(三)

在本節中我會介紹Sphinx在構建索引之前做的一些事情,主要是從mysql拉取數據保存,然後分詞排序保存到內存等等一係列的操作。下麵是幾個相關指令

    sql_query = \
        SELECT id, group_id, UNIX_TIMESTAMP(date_added) AS date_added, \
            title, content \
        FROM documents
    sql_query_range = SELECT MIN(id),MAX(id) FROM documents
    sql_range_step = 1000

其中sql_query是sphinx每次從mysql拉取數據的sql,而sql_query_range則是取得需要從mysql拉取的數據條目,而sql_rang_step則是表示每次從mysql拉取多少數據。sql_rang_range執行分兩種情況,第一種是第一次拉取數據的時候,第二種是當當前的range數據讀取完畢之後。

首先來看CSphSource_SQL::NextDocument函數,這個函數的主要作用是從mysql讀取數據然後切分保存,首先我們來看讀取數據這一部分,這裏步驟很簡單,就是執行對應的sql,然後判斷當前range的數據是否讀取完畢,如果讀取完畢則繼續執行sql_query_rang(RunQueryStep)。這裏要注意的是,sphinx讀取數據是一條一條的讀取然後執行的.

	do
	{
		// try to get next row
		bool bGotRow = SqlFetchRow ();

		// when the party's over...
		while ( !bGotRow )
		{
			// is that an error?
			if ( SqlIsError() )
			{
				sError.SetSprintf ( "sql_fetch_row: %s", SqlError() );
				m_tDocInfo.m_uDocID = 1; // 0 means legal eof
				return NULL;
			}

			// maybe we can do next step yet?
			if ( !RunQueryStep ( m_tParams.m_sQuery.cstr(), sError ) )
			{
				// if there's a message, there's an error
				// otherwise, we're just over
				if ( !sError.IsEmpty() )
				{
					m_tDocInfo.m_uDocID = 1; // 0 means legal eof
					return NULL;
				}

			} else
			{
				// step went fine; try to fetch
				bGotRow = SqlFetchRow ();
				continue;
			}

			SqlDismissResult ();

			// ok, we're over
			ARRAY_FOREACH ( i, m_tParams.m_dQueryPost )
			{
				if ( !SqlQuery ( m_tParams.m_dQueryPost[i].cstr() ) )
				{
					sphWarn ( "sql_query_post[%d]: error=%s, query=%s",
						i, SqlError(), m_tParams.m_dQueryPost[i].cstr() );
					break;
				}
				SqlDismissResult ();
			}

			m_tDocInfo.m_uDocID = 0; // 0 means legal eof
			return NULL;
		}

		// get him!
		m_tDocInfo.m_uDocID = VerifyID ( sphToDocid ( SqlColumn(0) ) );
		m_uMaxFetchedID = Max ( m_uMaxFetchedID, m_tDocInfo.m_uDocID );
	} while ( !m_tDocInfo.m_uDocID );

上麵的代碼我們可以看到一個很關鍵的字段m_uDocID,這個字段表示當前doc的id(因此數據庫的表設計必須有這個id字段).

讀取完畢數據之後,開始處理讀取的數據,這裏會按照字段來切分,主要是將對應的數據庫字段保存到索引fielld

	// split columns into fields and attrs
	for ( int i=0; i<m_iPlainFieldsLength; i++ )
	{
		// get that field
		#if USE_ZLIB
		if ( m_dUnpack[i]!=SPH_UNPACK_NONE )
		{
			DWORD uUnpackedLen = 0;
			m_dFields[i] = (BYTE*) SqlUnpackColumn ( i, uUnpackedLen, m_dUnpack[i] );
			m_dFieldLengths[i] = (int)uUnpackedLen;
			continue;
		}
		#endif
		m_dFields[i] = (BYTE*) SqlColumn ( m_tSchema.m_dFields[i].m_iIndex );
		m_dFieldLengths[i] = SqlColumnLength ( m_tSchema.m_dFields[i].m_iIndex );
	}

緊接著就是處理attribute,後續我們會詳細介紹attribute,現在我們隻需要知道它是一個類似二級索引的東西(不進入全文索引).

		switch ( tAttr.m_eAttrType )
		{
			case SPH_ATTR_STRING:
			case SPH_ATTR_JSON:
				// memorize string, fixup NULLs
				m_dStrAttrs[i] = SqlColumn ( tAttr.m_iIndex );
				if ( !m_dStrAttrs[i].cstr() )
					m_dStrAttrs[i] = "";

				m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 );
				break;
..................................
			default:
				// just store as uint by default
				m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToDword ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe?
				break;
		}

然後我們來看Sphinx如何處理得到的數據,核心代碼在 RtIndex_t::AddDocument中,這個函數主要是用來分詞(IterateHits中)然後保存數據到對應的數據結構,而核心的數據結構是RtAccum_t,也就是最終sphinx在寫索引到文件之前,會將數據保存到這個數據結構,這裏要注意一般來說sphinx會保存很多數據,然後最後一次性提交給索引引擎來處理.而索引引擎中處理的就是這個數據結構.因此最終會調用RtAccum_t::AddDocument.

這裏需要注意兩個地方,第一個是m_dAccum這個域,這個域是一個vector,而這個vector裏麵保存了CSphWordHit這個結構,我們來看這個結構的定義

    struct CSphWordHit
    {
        SphDocID_t		m_uDocID;		///< document ID
        SphWordID_t		m_uWordID;		///< word ID in current dictionary
        Hitpos_t		m_uWordPos;		///< word position in current document
    };

可以看到其實這個結構也就是保存了對應分詞的信息.

然後我們來看核心代碼,這裏主要是便利剛才從mysql得到的數據,去重然後保存數據.

	int iHits = 0;
	if ( pHits && pHits->Length() )
	{
		CSphWordHit tLastHit;
		tLastHit.m_uDocID = 0;
		tLastHit.m_uWordID = 0;
		tLastHit.m_uWordPos = 0;

		iHits = pHits->Length();
		m_dAccum.Reserve ( m_dAccum.GetLength()+iHits );
		for ( const CSphWordHit * pHit = pHits->First(); pHit<=pHits->Last(); pHit++ )
		{
			// ignore duplicate hits
			if ( pHit->m_uDocID==tLastHit.m_uDocID && pHit->m_uWordID==tLastHit.m_uWordID && pHit->m_uWordPos==tLastHit.m_uWordPos )
				continue;

			// update field lengths
			if ( pFieldLens && HITMAN::GetField ( pHit->m_uWordPos )!=HITMAN::GetField ( tLastHit.m_uWordPos ) )
				pFieldLens [ HITMAN::GetField ( tLastHit.m_uWordPos ) ] = HITMAN::GetPos ( tLastHit.m_uWordPos );

			// accumulate
			m_dAccum.Add ( *pHit );
			tLastHit = *pHit;
		}
		if ( pFieldLens )
			pFieldLens [ HITMAN::GetField ( tLastHit.m_uWordPos ) ] = HITMAN::GetPos ( tLastHit.m_uWordPos );
	}

做完上麵這些事情之後,就需要提交數據給索引處理引擎了,這裏核心的代碼都是在RtIndex_t::Commit中.

這個函數主要做兩個事情,第一個提取出前麵我們構造好的RtAccum_t,然後對於所有的doc進行排序,創建segment,也就是對應的索引塊(ram chunk),最後調用CommitReplayable來提交ram chunk到磁盤.

其實可以這麼理解,保存在內存中的索引也就是segment,然後當內存的大小到達限製後就會刷新內存中的索引到磁盤.

    void RtIndex_t::Commit ( int * pDeleted, ISphRtAccum * pAccExt )
    {
        assert ( g_bRTChangesAllowed );
        MEMORY ( MEM_INDEX_RT );

        RtAccum_t * pAcc = AcquireAccum ( NULL, pAccExt, true );
        if ( !pAcc )
            return;

    ...................................
        pAcc->Sort();

        RtSegment_t * pNewSeg = pAcc->CreateSegment ( m_tSchema.GetRowSize(), m_iWordsCheckpoint );
    .............................................

        // now on to the stuff that needs locking and recovery
        CommitReplayable ( pNewSeg, pAcc->m_dAccumKlist, pDeleted );
    ......................................
    }

然後我們來看RtAccum_t::CreateSegment函數,這個函數用來將分詞好的數據保存到ram chunk,這裏需要注意兩個數據結構分別是RtDoc_t和RtWord_t,這兩個數據結構分別表示doc信息和分詞信息.

結構很簡單,後麵的注釋都很詳細

    template < typename DOCID = SphDocID_t >
    struct RtDoc_T
    {
        DOCID						m_uDocID;	///< my document id
        DWORD						m_uDocFields;	///< fields mask
        DWORD						m_uHits;	///< hit count
        DWORD						m_uHit;		///< either index into segment hits, or the only hit itself (if hit count is 1)
    };

    template < typename WORDID=SphWordID_t >
    struct RtWord_T
    {
        union
        {
            WORDID					m_uWordID;	///< my keyword id
            const BYTE *			m_sWord;
        };
        DWORD						m_uDocs;	///< document count (for stats and/or BM25)
        DWORD						m_uHits;	///< hit count (for stats and/or BM25)
        DWORD						m_uDoc;		///< index into segment docs
    };

然後來看代碼,首先是初始化對應的寫結構,可以看到都是會寫到我們創建好的segment中.

	RtDocWriter_t tOutDoc ( pSeg );
	RtWordWriter_t tOutWord ( pSeg, m_bKeywordDict, iWordsCheckpoint );
	RtHitWriter_t tOutHit ( pSeg );

然後就是寫數據了,這裏主要是做一個聚合,也就是將相同的keyword對應的屬性聚合起來.

	ARRAY_FOREACH ( i, m_dAccum )
	{
        .......................................
		// new keyword; flush current keyword
		if ( tHit.m_uWordID!=tWord.m_uWordID )
		{
			tOutDoc.ZipRestart ();
			if ( tWord.m_uWordID )
			{
				if ( m_bKeywordDict )
				{
					const BYTE * pPackedWord = pPacketBase + tWord.m_uWordID;
					assert ( pPackedWord[0] && pPackedWord[0]+1<m_pDictRt->GetPackedLen() );
					tWord.m_sWord = pPackedWord;
				}
				tOutWord.ZipWord ( tWord );
			}

			tWord.m_uWordID = tHit.m_uWordID;
			tWord.m_uDocs = 0;
			tWord.m_uHits = 0;
			tWord.m_uDoc = tOutDoc.ZipDocPtr();
			uPrevHit = EMPTY_HIT;
		}
        ..................
    }

這次就分析到這裏,下次我們將會分析最核心的部分就是Sphinx如何刷新數據到磁盤.

最後更新:2017-10-21 09:04:25

  上一篇:go  PgSQL · 內核開發 · 如何管理你的 PostgreSQL 插件
  下一篇:go  MSSQL · 架構分析 · 從SQL Server 2017發布看SQL Server架構的演變