閱讀157 返回首頁    go 阿裏雲 go 技術社區[雲棲]


MOM係列文章之 - Spring Jms Integration 解讀

       前陣子對Spring Jms實現進行了一些擴展,借此機會係統化地研究了一下Spring對JMS的支持,整理成文,希望大家能夠喜歡!

       本文打算從兩個維度(編程API和包結構)進行闡述,希望大家讀完,能對Spring在JMS層麵上做的事情有一個大致了解。當然喜歡扣細節的朋友,也歡迎提出你的疑惑!

    第一部分:編程API

       首先,讓我們來看下Spring中我們最最經常用到的JmsTemplate,上圖

                                                                     

        從繼承關係上,我們先來看下接口 JmsOperations,基本上可以歸納出這幾類方法:

      Conveniencemethods for sending messages

      Conveniencemethods for sending auto-converted messages

      Conveniencemethods for receiving messages

      Conveniencemethods for receiving auto-converted messages

      Conveniencemethods for browsing messages


        但要注意的是這裏麵的方法throws出來的異常非JMS 1.1裏麵的標準JMSException,而是被轉譯過的JmsException。同時可以看出這個接口

充分遵循了CQRS原則。一個MQ其實就是Wrapper後的Queue,數據結構的知識告訴我們,queue有兩種存儲結構:Array and  LinkedList。Array擅長隨機讀取,LinkedList則擅長刪除更新操作,一旦底層采用 了LinkedList結構,Brower就是個大問題,這個要格外注意一下。

        再來看下JmsDestinationAccessor,該類繼承自JmsAccessor(該類實現了InitializingBean,不解釋),注意裏麵的DestinationResolver類,主要是從簡單的String類型的名字解析成具體的Destination,其默認的實現DynamicDestinationResolver基本上已經夠用了。舉個例子,倘若你要擴展將其解析成zookeeper可識別的Location,可以考慮實現該類。

         好,終於輪到JmsTemplate了,先貼一段Javadoc(這裏麵有兩個地方需要先了解下)

This template uses a org.springframework.jms.support.destination.DynamicDestinationResolver and a SimpleMessageConverter as default strategies for resolving a destination name or converting a message, respectively. These defaults can be overridden through the "destinationResolver" and "messageConverter" bean properties.

        直白,不解釋了。。。。。。

NOTE: The ConnectionFactory used with this template should return pooled Connections (or a single shared Connection) as well as pooled Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations is going to suffer.

        池化工廠,理由也很充分了。Spring隻提供了SingleConnectionFactory,至於池化麼,具體的Broker自己去實現,像AMQ在其內部就有基於Commons pool類庫的PooledConnectionFactory。

        ok,下麵我們深入JmsTemplate,了解其中幾個重要的方法

/**
	 * Execute the action specified by the given action object within a
	 * JMS Session. Generalized version of {@code execute(SessionCallback)},
	 * allowing the JMS Connection to be started on the fly.
	 * <p>Use {@code execute(SessionCallback)} for the general case.
	 * Starting the JMS Connection is just necessary for receiving messages,
	 * which is preferably achieved through the {@code receive} methods.
	 * @param action callback object that exposes the Session
	 * @param startConnection whether to start the Connection
	 * @return the result object from working with the Session
	 * @throws JmsException if there is any problem
	 * @see #execute(SessionCallback)
	 * @see #receive
	 */
	public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
		Assert.notNull(action, "Callback object must not be null");
		Connection conToClose = null;
		Session sessionToClose = null;
		try {
                        //通過事務同步管理器獲取與當前線程綁定的Resouce,這裏是JmsResourceHolder
			Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
					getConnectionFactory(), this.transactionalResourceFactory, startConnection);
			if (sessionToUse == null) {
				conToClose = createConnection();
				sessionToClose = createSession(conToClose);
				if (startConnection) {
					conToClose.start();
				}
				sessionToUse = sessionToClose;
			}
			if (logger.isDebugEnabled()) {
				logger.debug("Executing callback on JMS Session: " + sessionToUse);
			}
			return action.doInJms(sessionToUse);
		}
		catch (JMSException ex) {
			//注意這裏的妙處 - 異常轉譯
			throw convertJmsAccessException(ex);
		}
		finally {
			JmsUtils.closeSession(sessionToClose);
			ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
		}
	}


/**
	 * Send the given JMS message.
	 * @param session the JMS Session to operate on
	 * @param destination the JMS Destination to send to
	 * @param messageCreator callback to create a JMS Message
	 * @throws JMSException if thrown by JMS API methods
	 */
	protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
			throws JMSException {

		Assert.notNull(messageCreator, "MessageCreator must not be null");
		MessageProducer producer = createProducer(session, destination);
		try {
			Message message = messageCreator.createMessage(session);
			if (logger.isDebugEnabled()) {
				logger.debug("Sending created message: " + message);
			}
			doSend(producer, message);
			// Check commit - avoid commit call within a JTA transaction.
			if (session.getTransacted() && isSessionLocallyTransacted(session)) {
				// Transacted session created by this template -> commit.
				JmsUtils.commitIfNecessary(session);
			}
		}
		finally {
			JmsUtils.closeMessageProducer(producer);
		}
	}


public void convertAndSend(
			Destination destination, final Object message, final MessagePostProcessor postProcessor)
			throws JmsException {

		send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				Message msg = getRequiredMessageConverter().toMessage(message, session);
				return postProcessor.postProcessMessage(msg);//注意這裏不是對消息發送的後置處理,而是對消息Converter的後置處理(消息發送前的一個Hook)
			}
		});
	}


/**
	 * Actually receive a JMS message.
	 * @param session the JMS Session to operate on
	 * @param consumer the JMS MessageConsumer to receive with
	 * @return the JMS Message received, or {@code null} if none
	 * @throws JMSException if thrown by JMS API methods
	 */
	protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
		try {
			// Use transaction timeout (if available).
			long timeout = getReceiveTimeout();
			JmsResourceHolder resourceHolder =
					(JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
			if (resourceHolder != null && resourceHolder.hasTimeout()) {
				timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
			}
			Message message = doReceive(consumer, timeout);
			if (session.getTransacted()) {
				// Commit necessary - but avoid commit call within a JTA transaction.
				if (isSessionLocallyTransacted(session)) {
					// Transacted session created by this template -> commit.
					JmsUtils.commitIfNecessary(session);
				}
			}
			else if (isClientAcknowledge(session)) {
				// Manually acknowledge message, if any.
				if (message != null) {
					message.acknowledge();
				}
			}
			return message;
		}
		finally {
			JmsUtils.closeMessageConsumer(consumer);
		}
	}

     關鍵代碼處已經有注釋了,這裏就不再贅述了,掌握了這幾個核心方法,這個類就算拿下了。

     恩,從編程API的角度來看,差不多就這些內容了。

   第二部分:包結構

     下麵,我們從包結構的角度再來進一步了解一下Spring對Jms的集成,如下圖:


                                                

      org.springframework.jms包裏麵提供了一些JMS規範異常的runtime版本,看看jms2在這方麵的改進,就知道spring在這方麵已然是先驅了。

      org.springframework.jms.config包裏麵放置了對Jms schema的解析,這是spring為我們提供的一個非常有用的特性,schema用的好的話,也可以做到麵向接口編程,擴展性極好。這方麵感興趣的同學,推薦閱讀這裏https://openwebx.org/docs/Webx3_Guide_Book.html,深入了解下Webx是怎麼利用Schema實現OCP原則的。

      org.springframework.jms.connection包裏麵放置了一些與Connection相關的工具類(ConnectionFactoryUtils),基礎類(JmsResourceHolder)。這裏重點關注一下JmsTransactionManagerextendsAbstractPlatformTransactionManager,其中的doXXX方法非常有看點),這個類也是JMS本地事務處理的一個核心工作類,如下:

                                             


       org.springframework.jms.core包裏麵主要是spring封裝的一些回調接口,如BrowserCallbackMessageCreatorMessagePostProcessorProducerCallbackSessionCallback,當然我們之前分析過的JmsTemplate也在這個包裏麵。

       org.springframework.jms.core.support包裏麵就一個抽象類JmsGatewaySupport,暫時沒怎麼用,就是在afterPropertiesSet方法裏麵內置了一個initGateway方法,用來做一些定製化操作(custominitialization behavior)。

         org.springframework.jms.listener和org.springframework.jms.listener.adapter包,我們要重點關注一下,剛才編程式API主要介紹了消息的發送,消息的接受是怎麼處理的呢,主要看這兩個包裏麵的類。類圖如下:

       

             

   

          我們先來了解下SimpleMessageListenerContainer的核心方法:

           

/**
	 * Create a MessageConsumer for the given JMS Session,
	 * registering a MessageListener for the specified listener.
	 * @param session the JMS Session to work on
	 * @return the MessageConsumer
	 * @throws JMSException if thrown by JMS methods
	 * @see #executeListener
	 */
	protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {
		Destination destination = getDestination();
		if (destination == null) {
			destination = resolveDestinationName(session, getDestinationName());
		}
		MessageConsumer consumer = createConsumer(session, destination);

		if (this.taskExecutor != null) {
			consumer.setMessageListener(new MessageListener() {
				public void onMessage(final Message message) {
					taskExecutor.execute(new Runnable() {
						public void run() {
							processMessage(message, session);
						}
					});
				}
			});
		}
		else {
			consumer.setMessageListener(new MessageListener() {
				public void onMessage(Message message) {
					processMessage(message, session);
				}
			});
		}

		return consumer;
	}
        怎麼樣,很簡單吧?非常簡單的調度算法,也沒有失敗重連等高級功能。如果需要這些功能,怎麼辦?ok,是時候DefaultMessageListenerContainer出場了,一個功能相對比較豐富的Listener容器,和SimpleMessageListenerContainer不同,它使用AsyncMessageListenerInvoker執行一個loopedMessageConsumer.receive()調用來接收消息,注意這裏的Executor,默認是SimpleAsyncTaskExecutor,文檔裏寫的很清楚:
NOTE: This implementation does not reuse threads! Consider a thread-pooling TaskExecutor implementation instead, in particular for executing a large number of short-lived tasks.
        來看看這個類裏麵幾個重要的成員變量,首先是concurrentConsumers和maxConcurrentConsumers。通過設置setConcurrency方法,可以scale up number of consumers between the minimum number ofconsumersconcurrentConsumersand the maximum number of consumers(maxConcurrentConsumers)。那麼單個消費任務如何消費消息呢,這裏又有一個變量需要注意一下,即idleTaskExecutionLimit,官方的解釋很清楚了:

Within each task execution, a number of message reception attempts (according to the "maxMessagesPerTask" setting) will each wait for an incoming message (according to the "receiveTimeout" setting). If all of those receive attempts in a given task return without a message, the task is considered idle with respect to received messages. Such a task may still be rescheduled; however, once it reached the specified "idleTaskExecutionLimit", it will shut down (in case of dynamic scaling).
         接下來,我們來看這個類裏麵最最重要的調度方法,在其內部類AsyncMessageListenerInvoker裏麵,如下:

public void run() {
			synchronized (lifecycleMonitor) {
				activeInvokerCount++;
				lifecycleMonitor.notifyAll();
			}
			boolean messageReceived = false;
			try {
				if (maxMessagesPerTask < 0) {
					messageReceived = executeOngoingLoop();
				}
				else {
					int messageCount = 0;
					while (isRunning() && messageCount < maxMessagesPerTask) {
						messageReceived = (invokeListener() || messageReceived);
						messageCount++;
					}
				}
			}
			catch (Throwable ex) {
				clearResources();
				if (!this.lastMessageSucceeded) {
					// We failed more than once in a row - sleep for recovery interval
					// even before first recovery attempt.
					sleepInbetweenRecoveryAttempts();
				}
				this.lastMessageSucceeded = false;
				boolean alreadyRecovered = false;
				synchronized (recoveryMonitor) {
					if (this.lastRecoveryMarker == currentRecoveryMarker) {
						handleListenerSetupFailure(ex, false);
						recoverAfterListenerSetupFailure();
						currentRecoveryMarker = new Object();
					}
					else {
						alreadyRecovered = true;
					}
				}
				if (alreadyRecovered) {
					handleListenerSetupFailure(ex, true);
				}
			}
			finally {
				synchronized (lifecycleMonitor) {
					decreaseActiveInvokerCount();
					lifecycleMonitor.notifyAll();
				}
				if (!messageReceived) {
					this.idleTaskExecutionCount++;
				}
				else {
					this.idleTaskExecutionCount = 0;
				}
				synchronized (lifecycleMonitor) {
					if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
						// We're shutting down completely.
						scheduledInvokers.remove(this);
						if (logger.isDebugEnabled()) {
							logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
						}
						lifecycleMonitor.notifyAll();
						clearResources();
					}
					else if (isRunning()) {
						int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
						if (nonPausedConsumers < 1) {
							logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
									"Check your thread pool configuration! Manual recovery necessary through a start() call.");
						}
						else if (nonPausedConsumers < getConcurrentConsumers()) {
							logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
									"due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
									"to be triggered by remaining consumers.");
						}
					}
				}
			}
		}

		private boolean executeOngoingLoop() throws JMSException {
			boolean messageReceived = false;
			boolean active = true;
			while (active) {
				synchronized (lifecycleMonitor) {
					boolean interrupted = false;
					boolean wasWaiting = false;
					while ((active = isActive()) && !isRunning()) {
						if (interrupted) {
							throw new IllegalStateException("Thread was interrupted while waiting for " +
									"a restart of the listener container, but container is still stopped");
						}
						if (!wasWaiting) {
							decreaseActiveInvokerCount();
						}
						wasWaiting = true;
						try {
							lifecycleMonitor.wait();
						}
						catch (InterruptedException ex) {
							// Re-interrupt current thread, to allow other threads to react.
							Thread.currentThread().interrupt();
							interrupted = true;
						}
					}
					if (wasWaiting) {
						activeInvokerCount++;
					}
					if (scheduledInvokers.size() > maxConcurrentConsumers) {
						active = false;
					}
				}
				if (active) {
					messageReceived = (invokeListener() || messageReceived);
				}
			}
			return messageReceived;
		}

      差不多這個類就介紹到這裏,繼續往下看吧~

      org.springframework.jms.listener.endpoint包裏麵提供了一些JavaEE特性 – 對JCA的支持,這裏就不展開了。

      org.springframework.jms.supportorg.springframework.jms.support.converter,org.springframework.jms.support.destination則分別提供了Jms工具類JmsUtils(依我來看,JmsAccessor類可以考慮放到core包裏麵,而把一些工具類抽到這裏來),針對消息轉換器(主要包括三類轉換,Object<->Message,XML<->Message,Json<->Message),Destination的支持,難度不大,這裏也就不展開討論了

      org.springframework.jms.remoting包則告訴我們底層可以通過JMS走遠程服務,類似RMI的Remoting。

      ok,差不多就這些內容。看了這麼多,最後我們再總結一下Spring對JMS封裝的不足之處吧:

     (1) Spring對JMS的封裝停留在JMS 1.1規範上(1.0.2中的支持Deprecated了),JMS 2的支持在最新的4.0 版本中未曾找見;

     (2) 消息發送&接收的時候沒有預留鉤子方法。比方說我們有這樣的需求 - 跟蹤消息走向,在消息發送完後向本地的agent寫一點數據,agent定時,定量推送數據去server端做統計運算,展示等。這個時候就沒有out-of-box的方法可以去實現,當然變通的方法也有不少,但不適合和開源版本融合;

     (3) 缺少一些容錯策略,比方說消息發送失敗,如何處理?

     (4) 缺少連接複用,一種很重要的提升性能策略。


       如果有不明白的地方,歡迎大家留言討論!


參考資料:

https://docs.spring.io/spring/docs/4.0.0.RELEASE/spring-framework-reference/htmlsingle/

      

最後更新:2017-04-03 12:53:51

  上一篇:go SplishActivity的作用
  下一篇:go C++編程規範之33用小類代替巨類