聊一聊高併發高可用那些事 – Kafka篇

目錄

為什麼需要消息隊列

1.異步 :一個下單流程,你需要扣積分,扣優惠卷,發短信等,有些耗時又不需要立即處理的事,可以丟到隊列里異步處理。

2.削峰 :按平常的流量,服務器剛好可以正常負載。偶爾推出一個優惠活動時,請求量極速上升。由於服務器 Redis,MySQL 承受能力不一樣,如果請求全部接收,服務器負載不了會導致宕機。加機器嘛,需要去調整配置,活動結束後用不到了,即麻煩又浪費。這時可以將請求放到隊列里,按照服務器的能力去消費。

3.解耦 :一個訂單流程,需要扣積分,優惠券,發短信等調用多個接口,出現問題時不好排查。像發短信有很多地方需要用到, 如果哪天修改了短信接口參數,用到的地方都得修改。這時可以將要發送的內容放到隊列里,起一個服務去消費, 統一發送短信。

高吞吐、高可用 MQ 對比分析

看了幾個招聘網站,提到較多的消息隊列有:RabbitMQ、RocketMQ、Kafka 以及 Redis 的消息隊列和發布訂閱模式。

Redis 隊列是用 List 數據結構模擬的,指定一端 Push,另一端 Pop,一條消息只能被一個程序所消費。如果要一對多消費的,可以用 Redis 的發布訂閱模式。Redis 發布訂閱是實時消費的,服務端不會保存生產的消息,也不會記錄客戶端消費到哪一條。在消費的時候如果客戶端宕機了,消息就會丟失。這時就需要用到高級的消息隊列,如 RocketMQ、Kafka 等。

ZeroMQ 只有點對點模式和 Redis 發布訂閱模式差不多,如果不是對性能要求極高,我會用其它隊列代替,畢竟關解決開發環境所需的依賴庫就夠折騰的。

RabbitMQ 多語言支持比較完善,特性的支持也比較齊全,但是吞吐量相對小些,而且基於 Erlang 語言開發,不利於二次開發和維護。

RocketMQ 和 Kafka 性能差不多,基於 Topic 的訂閱模式。RocketMQ 支持分佈式事務,但在集群下主從不能自動切換,導致了一些小問題。RocketMQ 使用的集群是 Master-Slave ,在 Master 沒有宕機時,Slave 作為災備,空閑着機器。而 Kafka 採用的是 Leader-Slave 無狀態集群,每台服務器既是 Master 也是 Slave。

Kafka 相關概念

在高可用環境中,Kafka 需要部署多台,避免 Kafka 宕機后,服務無法訪問。Kafka集群中每一台 Kafka 機器就是一個 Broker。Kafka 主題名稱和 Leader 的選舉等操作需要依賴 ZooKeeper。

同樣地,為了避免 ZooKeeper 宕機導致服務無法訪問,ZooKeeper 也需要部署多台。生產者的數據是寫入到 Kafka 的 Leader 節點,Follower 節點的 Kafka 從 Leader 中拉取數據同步。在寫數據時,需要指定一個 Topic,也就是消息的類型。

一個主題下可以有多個分區,數據存儲在分區下。一個主題下也可以有多個副本,每一個副本都是這個主題的完整數據備份。Producer 生產消息,Consumer 消費消息。在沒給 Consumer 指定 Consumer Group 時會創建一個臨時消費組。Producer 生產的消息只能被同一個 Consumer Group 中的一個 Consumer 消費。

  • Broker:Kafka 集群中的每一個 Kafka 實例
  • Zookeeper:選舉 Leader 節點和存儲相關數據
  • Leader:生產者與消費者只跟 Leader Kafka 交互
  • Follower:Follower 從 Leader 中同步數據
  • Topic:主題,相當於發布的消息所屬類別
  • Producer:消息的生產者
  • Consumer:消息的消費者
  • Partition:分區
  • Replica:副本
  • Consumer Group:消費組

分區、副本、消費組

  • 分區

主題的數據會按分區數分散存到分區下,把這些分區數據加起來才是一個主題的完整的數據。分區數最好是副本數的整數倍,這樣每個副本分配到的分區數比較均勻。同一個分區寫入是有順序的,如果要保證全局有序,可以只設置一個分區。

如果分區數小於消費者數,前面的消費者會配到一個分區,後面超過分區數的消費者將無分區可消費,除非前面的消費者宕機了。如果分區數大於消費者數,每個消費者至少分配到一個分區的數據,一些分配到兩個分區。這時如果有新的消費者加入,會把有兩個分區的調一個分配到新的消費者。

分區數可以設置成 6、12 等數值。比如 6,當消費者只有一個時,這 6 個分區都歸這個消費者,後面再加入一個消費者時,每個消費者都負責 3 個分區,後面又加入一個消費者時,每個消費者就負責 2 個分區。每個消費者分配到的分區數是一樣的,可以均勻地消費。

  • 副本

主題的副本數即數據備份的個數,如果副本數為 1 , 即使 Kafka 機器有多個,當該副本所在的機器宕機后,對應的數據將訪問失敗。

集群模式下創建主題時,如果分區數和副本數都大於 1,主題會將分區 Leader 較均勻的分配在有副本的 Kafka 上。這樣客戶端在消費這個主題時,可以從多台機器上的 Kafka 消息數據,實現分佈式消費。

副本數不是越多越好,從節點需要從主節點拉取數據同步,一般設置成和 Kafka 機器數一樣即可。如果只需要用到高可用的話,可以採用 N+1 策略,副本數設置為 2,專門弄一台 Kafka 來備份數據。然後主題分佈存儲在 “N” 台 Kafka 上,”+1″ 台 Kafka 保存着完整的主題數據,作為備用服務。

Replicas 表示在哪些 Kafka 機器上有主題的副本,Isr 表示當前有副本的 Kafka 機器上還存活着的 Kafka 機器。主題分區中所涉及的 Leader Kafka 宕機時,會將宕機 Kafka 涉及的分區分配到其它可用的 Kafka 節點上。如下:

  • 消費組

每一個消費組記錄者各個主題分區的消費偏移量,在消費的時候,如果沒有指定消費組,會默認創建一個臨時消費組。生產者生產的消息只能被同一消費組下某個消費者消費。如果想要一條消息可以被多個消費者消費,可以加入不同的消費組。

偏移量最大值,消息存儲策略

  • 偏移量的最大值

long 類型最大值是(2^63)-1 (為什麼要減一呢?第一位是符號位,正的有262,負的有262,其中+0 和 -0 是相等的 , 只不過有的語言把0算到負裏面,有的語言把0算到正裏面)。 偏移量是一個 long 類型,除去負數,包含0,其最大值為 2^62。

  • 消息存儲策略

Kafka 配置項提供兩種策略, 一種是基於時間:log.retention.hours=168,另一種是基於大小:log.retention.bytes=1073741824 。符合條件的數據會被標記為待刪除,Kafka會在恰當的時候才真正刪除。

Zookeeper 上存的 Kafka 相關數據

如何確保消息只被消費一次

前面已經講到,同一主題里的分區數據,只能被相同消費組裡其中一個消費者消費。當有多個消費者同時消費同一主題時,將這些消費者都加入相同的消費組,這時生產者的消息只能被其中一個消費者消費。

重複消費和數據丟失問題

  • 生產者

生產者發送消息成功后,不等 Kafka 同步完成的確認,繼續發送下一條消息。在發的過程中如果 Leader Kafka 宕機了,但生產者並不知情,發出去的信息 Kafka 就收不到,導致數據丟失。解決方案是將 Request.Required.Acks 設置為 -1,表示生產者等所有副本都確認收到后才發送下一條消息。

Request.Required.Acks=0 表示發送消息即完成發送,不等待確認(可靠性低,延遲小,最容易丟失消息)

Request.Required.Acks=1 表示當 Leader 提交同步完成后才發送下一條消息

  • 消費者

消費者有兩種情況,一種是消費的時候自動提交偏移量導致數據丟失:拿到消息的同時偏移量加一,如果業務處理失敗,下一次消費的時候偏移量已經加一了,上一個偏移量的數據丟失了。

另一種是手動提交偏移量導致重複消費:等業務處理成功后再手動提交偏移量,有可能出現業務處理成功,偏移量提交失敗,那下一次消費又是同一條消息。

怎麼解決呢?這是一個 or 的問題,偏移量要麼自動提交要麼手動提交,對應的問題是要麼數據丟失要麼重複消費。如果消息要求實時性高,丟個一兩條沒關係的話可以選擇自動提交偏移量。如果消息一條都不能丟的話可以選擇手動提交偏移量,然後將業務設計成冪等,不管這條消息消費多少次最終和消費一次的結果一樣。

Linux Kafka 操作命令

  • 查看 Kafka 中 Topic
  • 查看 Kafka 詳情
  • 消費 Topic
  • 查看所有消費組
  • 查看消費組的消費情況

Windows 可視化工具 Kafka Tool

  • 配置 Hosts 文件
123.207.79.96 ZooKeeper-Kafka-01
  • 配置 Kafka Tool 連接信息

  • 查看 Kafka 主題數據

生產者和消費者使用代碼

  • 具體操作參考 github.com/wong-winnie/library

訂閱號:偉洪winnie

  • 訂閱號回復關鍵字【聊聊高併發高可用那些事】獲取專欄文章

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

※回頭車貨運收費標準

OAuth + Security – 5 – Token存儲升級(數據庫、Redis)

PS:此文章為系列文章,建議從第一篇開始閱讀。

在我們之前的文章中,我們當時獲取到Token令牌時,此時的令牌時存儲在內存中的,這樣顯然不利於我們程序的擴展,所以為了解決這個問題,官方給我們還提供了其它的方式來存儲令牌,存儲到數據庫或者Redis中,下面我們就來看一看怎麼實現。

不使用Jwt令牌的實現

  • 存儲到數據庫中(JdbcTokenStore)

使用數據庫存儲方式之前,我們需要先準備好對應的表。Spring Security OAuth倉庫可以找到相應的腳本:https://github.com/spring-projects/spring-security-oauth/blob/master/spring-security-oauth2/src/test/resources/schema.sql。該腳本為HSQL,所以需要根據具體使用的數據庫進行相應的修改,以MySQL為例,並且當前項目中,只需要使用到oauth_access_token和oauth_refresh_token數據表,所以將創建這兩個庫表的語句改為MySQL語句:

CREATE TABLE oauth_access_token (
	token_id VARCHAR ( 256 ),
	token BLOB,
	authentication_id VARCHAR ( 256 ),
	user_name VARCHAR ( 256 ),
	client_id VARCHAR ( 256 ),
	authentication BLOB,
	refresh_token VARCHAR ( 256 ) 
);

CREATE TABLE oauth_refresh_token ( 
token_id VARCHAR ( 256 ), 
token BLOB, authentication BLOB 
);

然後我們需要去配置對應的認證服務器,主要就是修改之前文章中TokenConfigure類中的tokenStore()方法:

// 同時需要注入數據源
@Autowired
private DataSource dataSource;
    
@Bean
    public TokenStore tokenStore() {
        return new JdbcTokenStore(dataSource);
    }

同時需要添加jdbc的依賴:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

認證服務器中的配置保持本系列第一章的配置不變,此處不再贅述。

其中若有不理解的地方,請參考該系列的第一篇文章

關於數據源的補充:

在我們項目中配置的數據源,可能不一定是使用的官方提供的格式,比如我們自定義的格式,或者使用第三方的數據源,那麼我們如何去配置呢?這裏以mybatis-plus的多數據源為例:

@Configuration
@EnableAuthorizationServer
public class FebsAuthorizationServerConfigure extends AuthorizationServerConfigurerAdapter {

    ......
    
    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Bean
    public TokenStore tokenStore() {
        DataSource dimplesCloudBase = dynamicRoutingDataSource.getDataSource("dimples_cloud_base");
        return new JdbcTokenStore(febsCloudBase);
    }
    ......
}

然後啟動項目,重新獲取Token進行測試,能正確的獲取到Token:

我們查看數據中,看是否已經存入相關信息到數據庫中:

  • 存儲到redis(RedisTokenStore)

令牌存儲到redis中相比於存儲到數據庫中來說,存儲redis中,首先在性能上有一定的提升,其次,令牌都有有效時間,超過這個時間,令牌將不可再用,而redis的可以給對應的key設置過期時間,完美切合需求,所有令牌存儲到redis中也是一種值得使用的方法。

首先,我們需要在項目中添加redis依賴,同時配置redis

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

新建配置類RedisConfigure

@Configuration
public class RedisConfigure{

    @Bean
    @ConditionalOnClass(RedisOperations.class)
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(mapper);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key採用 String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的 key也採用 String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式採用 jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的 value序列化方式採用 jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;
    }
    
}

配置redis的連接

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    lettuce:
      pool:
        min-idle: 8
        max-idle: 500
        max-active: 2000
        max-wait: 10000
    timeout: 5000

這時我們啟動項目測試,會發現項目將會報錯:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericObjectPoolConfig

這是由於我們配置RedisConfigure時,使用到了一個ObjectMapper類,這個類需要我們引入Apache的一個工具包

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

最後,我們需要在認證服務器中配置token的存儲方式,還是同jdbc的配置,在tokenStore()方法中配置,打開我們的TokenConfigure類,然後做如下配置:

@Autowired
private RedisConnectionFactory redisConnectionFactory;

@Bean
public TokenStore tokenStore() {
    return new RedisTokenStore(redisConnectionFactory);
}

認證服務器中的配置還是跟之前的一樣,保持不變,啟動項目,獲取Token測試:

我們查看Redis中,看是否已經存入相關信息到數據庫中:

  • 其它

我們打開TokenStore接口的實現類,會發現,還有一種JwkTokenStore,這種實際上就是將我們UUID格式令牌變成可以帶特殊含義的jwt格式的令牌,我們已經在第三篇文章中介紹過了,可以參考前面的文章。

但是我們需要明白一點的是,這種令牌還是存儲在內存中的,後期我們如何將其存儲到redis中是我們研究的方向。

在上面的token存儲到數據庫和存儲到redis中,我們會發現一個問題,那就是我們多次獲取令牌,但是其值是固定不變的,為了解決這個問題,我們可以使用如下方式解決:

@Bean
public TokenStore tokenStore() {
    RedisTokenStore redisTokenStore = new RedisTokenStore(redisConnectionFactory);
    // 解決每次生成的 token都一樣的問題
    redisTokenStore.setAuthenticationKeyGenerator(oAuth2Authentication -> UUID.randomUUID().toString());
    return redisTokenStore;
}

使用JWT令牌的實現

在之前的所有使用方法中,我們要麼是將Token存儲在數據庫或者redis中的時候,令牌的格式是UUID的無意義字符串,或者是使用JWT的有意義字符串,但是確是將令牌存儲在內存中,那麼,我們現在想使用JWT令牌的同時,也想將令牌存儲到數據庫或者Redis中。我們該怎麼配置呢?下面我們以Redis存儲為例,進行探索:

首先,我們還是要使用到TokenConfigure中的JWT和Redis配置:

@Autowired
private RedisConnectionFactory redisConnectionFactory;

@Bean
public JwtAccessTokenConverter accessTokenConverter() {
    JwtAccessTokenConverter converter = new JwtAccessTokenConverter();
    //對稱秘鑰,資源服務器使用該秘鑰來驗證
    converter.setSigningKey(SIGNING_KEY);
    return converter;
}

@Bean
public TokenStore tokenStore() {
    return new RedisTokenStore(redisConnectionFactory);
}

接下來,是配置認證服務器,在認證服務器中,跟之前的配置有一點區別,如下:

@Autowired
private JwtAccessTokenConverter jwtAccessTokenConverter;

@Autowired
private TokenStore tokenStore;

@Override
public void configure(AuthorizationServerEndpointsConfigurer endpoints) {
    endpoints
            // 配置密碼模式管理器
            .authenticationManager(authenticationManager)
            // 配置授權碼模式管理器
            .authorizationCodeServices(authorizationCodeServices())
            // 令牌管理
//                .tokenServices(tokenServices());
            .accessTokenConverter(jwtAccessTokenConverter)
            .tokenStore(tokenStore);

}

啟動項目,進行測試,獲取token

然後使用該令牌請求資源

至此,我們差不多完成了Token令牌的存儲和獲取

源碼傳送門: https://gitee.com/dimples820/dimples-explore

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※回頭車貨運收費標準

台中搬家公司費用怎麼算?

IEA:再生能源對武漢肺炎適應力最佳 將是滿足2020需求增長的唯一能源

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※回頭車貨運收費標準

LG印度工廠化學氣體外洩已11死 村民抬遺體抗議要求關廠

摘錄自2020年5月10日自由時報報導

印度東南部維沙卡帕特南(Visakhapatnam)7日時,南韓LG集團子公司的聚合物工廠發生苯乙烯(styrene)氣體大規模外洩,據外媒,目前已知最少11死,逾千人住院,現仍有數百人在院內接受治療。當地時間週六,憤怒的村民將受害者遺體陳列於工廠大門前,要求工廠即刻關閉,同時逮捕工廠最高管理階層。

這間工廠屬於「南韓樂金聚合物(LG Polymers)」為南韓最大化工企業「LG化學(LG Chem Ltd)」的子公司。同日該公司發表聲明向受害者道歉,聲明也透露,初步調查結果證實,這場悲劇是由苯乙烯儲存槽蒸氣外洩而引起的。

苯乙烯被廣泛用於塑膠及橡膠製品製造,據我國勞動部職業安全衛生署2015「職業性苯乙烯、二苯乙烯中毒認定參考指引」,吸入或食入高濃度苯乙烯可能刺激呼吸道,引起昏睡、頭痛、精神混亂、協調感喪失及意識不清等症狀,若食入或在嘔吐下吸入肺部,可能導致嚴重肺組織損傷,引起肺組織壞死,甚至致死。

公害污染
空氣污染
毒物
污染治理
國際新聞
印度
化學工廠
化學氣體

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※推薦台中搬家公司優質服務,可到府估價

不只陸上生物會感染 科學家擔憂武漢肺炎恐可傳「鯨魚」

摘錄自2020年5月9日自由時報報導

武漢肺炎(新型冠狀病毒疾病,COVID-19)衝擊全球,不僅導致近400萬人染疫,與人類親近的犬、貓,甚至貂和大型貓科動物都有確診案例;專家則警告,雖目前還未有正式的科學期刊證實,但有研究顯示武漢肺炎可感染的潛在生物範圍恐擴及「鯨魚」。

根據加國媒體《Nunatsiaq news 》報導,目前科學界認為其他動物也會感染武漢肺炎的原因在於,牠們呼吸道細胞表面的蛋白質類似武漢肺炎的受體ACE2,當武漢肺炎病毒進入體內時,會經由ACE2進入細胞並開始複製,進而侵入呼吸道,造成感染。日前加利福尼亞大學戴維斯分校的研究人員創建了一份ACE2受體的動物清單,雖目前仍須更多研究佐證,但專家恩威亞(Martin Nweeia)日前在威爾遜國際學者中心的討論會上強調,北極海域的海洋哺乳動物有感染武漢肺炎的可能性。

雖然人類多透過飛沫感染到武漢肺炎,但根據國際文獻指出,人的糞便、尿液也會殘留病毒,當這些含有病毒的廢水排入水中時,有可能導致某些動物感染,雖然病毒可能被海水中的某些物質破壞,但是北極海有多個鹹淡水入口,以及因全球暖化冰川徑流和夏季冰融化增加了淡水系統,可能影響海洋生物以及病毒的存活。

生活環境
生態保育
物種保育
生物多樣性
國際新聞
武漢肺炎
鯨魚
動物與大環境變遷
公共衛生

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

網頁設計最專業,超強功能平台可客製化

疫情衝擊酪農 日本北海道知事推牛奶挑戰

摘錄自2020年5月10日中央通訊社報導

日本境內2019冠狀病毒疾病(COVID-19)疫情延燒,連帶衝擊到北海道酪農生乳銷量,讓北海道知事鈴木直道推出「牛奶挑戰」,呼籲民眾多喝牛奶,增加牛奶或優格購買量。日本放送協會(NHK)報導,鈴木直道在一段影片中單手拿著一杯牛奶,然後一口氣喝完,這個稱為「牛奶挑戰」的活動,希望能提升牛奶消費量。

日本境內受到疫情影響,學校臨時停課,所以沒有提供營養午餐,讓牛奶消費量下滑;民眾避免外出及出遊,也造成牛奶製品的冰淇淋、蛋糕等甜點銷量銳減。牛奶消費量減少直接衝擊酪農,而北海道的農業生產額中,酪農占了4成,在北海道農產占有相當重要地位。

一般來說,需求減少可以減產因應,但事實上不然,北海道江別市酪農川口谷仁說,每天要幫牛擠3次奶,如果不這麼做,牛就會生病,所以無法停止擠奶。加上北海道每年5月到6月的氣候,對牛來說是最舒服的時候,也是一年中最容易增加生乳產量的時期。而再這樣下去的話,最糟的情況就是造成廠商無法再收購,每天固定生產的生乳將不得不廢棄。

生活環境
國際新聞
日本
北海道
酪農
牛奶
武漢肺炎
疫情下的食衣住行
公共衛生
經濟動物
動物福利

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計最專業,超強功能平台可客製化

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家公司費用怎麼算?

給非目標魚類的「逃生指示燈」 研究:漁網裝LED燈 可使混獲減半

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※超省錢租車方案

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

※推薦台中搬家公司優質服務,可到府估價

疫情讓自然資源復甦 泰國家公園擬每年關3個月

摘錄自2020年5月11日中央社報導

為了避免武漢肺炎疫情擴大,泰國從3月25日關閉所有的國家公園,例如知名的考艾國家公園(Khao Yai National Park)是開園58年以來第一次閉園,而且數條穿過國家公園的公路也因此封閉。

泰國公視(Thai PBS)報導,泰國自然資源和環境部(Ministry of Natural Resources and Environment)部長烏拉沃(Varawut Silpa-archa)日前表示,過去兩個月國家公園因為疫情閉園,反讓許多野生動物再生。

烏拉沃表示,受到這樣狀況的啟發,自然資源和環境部未來準備讓全泰國157個國家公園每年閉園三個月,他要求國家公園局(Department of National Parks, Wildlife and Plant Conservation)在一切恢復正常後,研擬適合的閉園時間表。

泰媒經理人日報(Manager Daily)報導,國家公園局局長譚亞(Thanya Netithamkul)表示,國家公園局已經和相關單位討論過,會要求各個國家公園準備相關計畫。

生態保育
生物多樣性
國際新聞
泰國
國家公園
疫情

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※回頭車貨運收費標準

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

基於Azure IoT開發.NET物聯網應用系列-全新的Azure IoT架構

物聯網技術已經火了很多年了,業界各大廠商都有各自成熟的解決方案。我們公司主要搞新能源汽車充電,充電樁就是物聯網技術的最大應用,車聯網、物聯網、互聯網三網合一。2017年的時候重點研究過Azure IoT技術架構和使用,

Azure IoT 技術研究系列1-入門篇

隨着業界技術的發展,近期又重新關注並研究了最新的Azure  IoT架構,現在將結合著.NET Core技術和Azure IoT 做一些物聯網應用,將研究的成果分享給大家。

關於IoT的一些基本概念,重新梳理一下,分享給大家:

  • IoT:Internet of Things,即萬物互聯。
  • IoT Devices:物聯網設備。
  • IoT Edge Devices:物聯網邊緣計算設備。
  • IoT Gateway:IoT網關,負責IoT物聯網設備的接入、管理和控制、通訊(上行和下行)
  • 通訊協議:TCP、MQTT、AMQP、HTTPS、zgebee等等
  • Azure IoT Central。 IoT Central 是完全託管的 IoT物聯網 SaaS(軟件即服務)服務
  • 目前Azure仍然提供了Azure IoT Hub:直譯為Azure的物聯網中心
  • Azure IoT Hub為物聯網設備提供註冊、管理、溝通交互的雲服務。可用於管理數十億物聯網設備,提供可靠和安全的雲端與設備之間的雙向通信支持,每月可處理數以萬億計消息,並簡化了與其他Azure服務之間的集成,包括Azure機器學習以及
  • Azure流分析等。它是微軟Azure IoT Suite的重要組成部分,也是微軟物聯網戰略的重要基礎。

接下來,我們看一下Azure IoT最新的技術架構:

   

   下面,我們詳細介紹一下這個架構組成:

   一、 Things(物聯網設備側)

  1. IoT devices:前面已經介紹過了,泛指各類物聯網設備。設備可以安全地註冊到雲中,並且可以連接到雲之後,發送和接收數據。

  2. IoT edge devices:物聯網邊緣計算設備,某些設備可能會是在設備本身上或在現場網關中執行一些數據處理的邊緣設備。舉個大家平時常見的設備:充電樁,作為IoT邊緣計算設備,其自身有嵌入式操作系統、AI智能芯片,可以實現一些簡單的邊緣計算場景

  3. Cloud Gateway:雲網關,雲網關提供一個雲中心,以便設備安全地連接到雲併發送數據。 它還提供設備管理功能,包括設備的命令和控制。

      對於雲網關,Azure 建議使用Azure  IoT 中心。Azure IoT 中心是從設備引入事件的託管雲服務,充當設備與後端服務之間的消息代理。 同時提供安全連接、事件引入、雙向通信和設備管理。

      當然,我們也可以自建雲網關,支持各類物聯網設備的接入、管理和控制。

  4. Bulk devices provisioning:設備批量設置,統一管理設置海量設備。 對於註冊和連接許多組設備。可以使用 IoT 中心設備預配服務 (DPS)。 DPS 可用於大規模分配設備並將設備註冊到特定 Azure IoT 中心終結點。

二、Insights(洞察、洞見,可以理解為設備接入管理、數據處理、數據持久化、數據分析、可視化)

  1. Streaming Processing:流式數據處理

  Azure提供了專門的流分析服務。 流分析可以使用時間開窗函數、流聚合和外部數據源聯接大規模執行複雜分析。假如說我們自建系統做物聯網數據流式分析的話,可以使用Kafka、Flink、Spark等主流的大數據流式分析技術。

  2. Data transformation:數據轉換操作或聚合遙測數據流。

  常見的場景包括通訊協議轉換,例如,將二進制數據轉換為 JSON,或者合併數據點。 如果數據在到達 IoT 中心之前必須轉換,可以使用協議網關(一個可以轉換數據的網關)。 同時,數據可以在到達 IoT 中心後轉換。

  在這種情況下,可以使用 Azure Functions 函數計算,Azure Functions內置了與 IoT 中心、Cosmos DB 和 Blob 存儲的集成。

  3. Warm path store:熱存儲

  熱存儲,存儲實時物聯網設備上傳下發的數據,這些數據必須可按設備實時查詢,以用於報告和可視化。舉個實際的業務場景:充電樁實時上傳的電壓、電流、SOC等實時設備數據,這些數據的實時性要求高,可以存儲在熱存儲中。

  4. Cold path store:冷存儲

  如果所有的物聯網設備數據全部存儲在熱存儲中,其硬件成本會很高。數據具備一定的時效性,因為,當數據失去了一定的時效性要求后,可以存儲在冷存儲中,降低存儲的成本。

  這些數據會保留較長時間,用於批處理。 對於冷路徑存儲,可以使用 Azure Blob 存儲。 數據可無限期地以較低成本在 Blob 存儲中存檔,並且可以輕鬆訪問以進行批處理。

  5. UI Reporting and tools:可視化展現

  可視化展現方面,通常包含:IoT設備管理UI、設備控制UI、趨勢圖、連接狀態圖表、數據分析圖表等等,這個地方可以使用各類UI展現技術實現了。

三、 Action(運維管理、操作)

  1. Machine Learning:機器學習

   大家會問,用機器學習干什麼?通過歷史遙測數據執行模型訓練,實現IoT設備的預測性維護,同時還能做什麼?還可以對上報的數據建立不同的模型,實時進行訓練,智能控制設備。比如說充電樁的例子,動態調控充電功率,實現最大充電效率。

  2. Business integration:業務流程集成

   業務流程集成根據來自設備數據執行各類後續操作。 可以包括:存儲實時消息、引發警報、發送电子郵件或短信,或者與 CRM 集成。舉個實際的業務場景:當需要設備運維時,發出一個運維工單到產品運維部門,實現IoT設備的智能運維和派單處理。

  3. User Management:用戶管理

   用戶管理限制哪些用戶或組可以在設備上執行操作,例如升級固件。 它還定義應用程序中的用戶功能。

  綜上是Azure IoT架構的詳細介紹和說明,比2017年時,產品更加SaaS化,更加AI智能、更加體系。分享給大家。

 

 

周國慶

2020/6/7

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

網頁設計最專業,超強功能平台可客製化

※別再煩惱如何寫文案,掌握八大原則!

異步函數async await在wpf都做了什麼?

首先我們來看一段控制台應用代碼:

 class Program
 {
     static async Task Main(string[] args)
     {
        System.Console.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
        var result = await ExampleTask(2);
        System.Console.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
        System.Console.WriteLine(result);
        Console.WriteLine("Async Completed");
     }

     private static async Task<string> ExampleTask(int Second)
     {
        await Task.Delay(TimeSpan.FromSeconds(Second));
        return $"It's Async Completed in {Second} seconds";
     }
 }

輸出結果

Thread Id is Thread:1,Is Thread Pool:False
Thread Id is Thread:4,Is Thread Pool:True
It's Async Completed in 2 seconds
Async Completed

如果這段代碼在WPF運行,猜猜會輸出啥?

      private async void Async_Click(object sender, RoutedEventArgs e)
      {
          Debug.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
          var result= await ExampleTask(2);
          Debug.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
          Debug.WriteLine(result);
          Debug.WriteLine("Async Completed");   
      }

      private async Task<string> ExampleTask(int Second)
      {
          await Task.Delay(TimeSpan.FromSeconds(Second));
          return $"It's Async Completed in {Second} seconds";
      }

輸出結果:

Thread Id is Thread:1,Is Thread Pool:False
Thread Id is Thread:1,Is Thread Pool:False
It's Async Completed in 2 seconds
Async Completed

這時候你肯定是想說,小朋友,你是否有很多問號????,我們接下看下去

一.SynchronizationContext(同步上下文)

首先我們知道async await 異步函數本質是狀態機,我們通過反編譯工具dnspy,看看反編譯的兩段代碼是否有不同之處:

控制台應用:

internal class Program
{
    [DebuggerStepThrough]
	private static Task Main(string[] args)
	{
		Program.<Main>d__0 <Main>d__ = new Program.<Main>d__0();
		<Main>d__.args = args;
		<Main>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
		<Main>d__.<>1__state = -1;
		<Main>d__.<>t__builder.Start<Program.<Main>d__0>(ref <Main>d__);
		return <Main>d__.<>t__builder.Task;
	}
    
	[DebuggerStepThrough]
	private static Task<string> ExampleTask(int Second)
	{
		Program.<ExampleTask>d__1 <ExampleTask>d__ = new Program.<ExampleTask>d__1();
		<ExampleTask>d__.Second = Second;
		<ExampleTask>d__.<>t__builder = AsyncTaskMethodBuilder<string>.Create();
		<ExampleTask>d__.<>1__state = -1;
		<ExampleTask>d__.<>t__builder.Start<Program.<ExampleTask>d__1>(ref <ExampleTask>d__);
		return <ExampleTask>d__.<>t__builder.Task;
	}

	[DebuggerStepThrough]
	private static void <Main>(string[] args)
	{
	        Program.Main(args).GetAwaiter().GetResult();
	}
}

WPF:

public class MainWindow : Window, IComponentConnector
{

	public MainWindow()
	{
	       this.InitializeComponent();
	}

	[DebuggerStepThrough]
	private void Async_Click(object sender, RoutedEventArgs e)
	{
		MainWindow.<Async_Click>d__1 <Async_Click>d__ = new MainWindow.<Async_Click>d__1();
		<Async_Click>d__.<>4__this = this;
		<Async_Click>d__.sender = sender;
		<Async_Click>d__.e = e;
		<Async_Click>d__.<>t__builder = AsyncVoidMethodBuilder.Create();
		<Async_Click>d__.<>1__state = -1;
		<Async_Click>d__.<>t__builder.Start<MainWindow.<Async_Click>d__1>(ref <Async_Click>d__);
	}

	[DebuggerStepThrough]
	private Task<string> ExampleTask(int Second)
	{
	        MainWindow.<ExampleTask>d__3 <ExampleTask>d__ = new MainWindow.<ExampleTask>d__3();
		<ExampleTask>d__.<>4__this = this;
		<ExampleTask>d__.Second = Second;
		<ExampleTask>d__.<>t__builder = AsyncTaskMethodBuilder<string>.Create();
		<ExampleTask>d__.<>1__state = -1;
		<ExampleTask>d__.<>t__builder.Start<MainWindow.<ExampleTask>d__3>(ref <ExampleTask>d__);
		return <ExampleTask>d__.<>t__builder.Task;
	}

	[DebuggerNonUserCode]
	[GeneratedCode("PresentationBuildTasks", "4.8.1.0")]
	public void InitializeComponent()
	{
		bool contentLoaded = this._contentLoaded;
		if (!contentLoaded)
		{
		     this._contentLoaded = true;
		     Uri resourceLocater = new Uri("/WpfApp1;component/mainwindow.xaml", UriKind.Relative);
		     Application.LoadComponent(this, resourceLocater);
		}
	}
	private bool _contentLoaded;
}

我們可以看到完全是一致的,沒有任何區別,為什麼編譯器生成的代碼是一致的,卻會產生不一樣的結果,我們看看創建和啟動狀態機代碼部分的實現:

public static AsyncVoidMethodBuilder Create()
{
	SynchronizationContext synchronizationContext = SynchronizationContext.Current;
	if (synchronizationContext != null)
	{
		synchronizationContext.OperationStarted();
	}
	return new AsyncVoidMethodBuilder
	{
		_synchronizationContext = synchronizationContext
	};
}

[DebuggerStepThrough]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Start<[Nullable(0)] TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
	AsyncMethodBuilderCore.Start<TStateMachine>(ref stateMachine);
}

[DebuggerStepThrough]
public static void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
	if (stateMachine == null)
	{
		ThrowHelper.ThrowArgumentNullException(ExceptionArgument.stateMachine);
	}
	Thread currentThread = Thread.CurrentThread;
	Thread thread = currentThread;
	ExecutionContext executionContext = currentThread._executionContext;
	ExecutionContext executionContext2 = executionContext;
	SynchronizationContext synchronizationContext = currentThread._synchronizationContext;
	try
	{
	     stateMachine.MoveNext();//狀態機執行代碼
	}
	finally
	{
	     SynchronizationContext synchronizationContext2 = synchronizationContext;
	     Thread thread2 = thread;
	     if (synchronizationContext2 != thread2._synchronizationContext)
	     {
		  thread2._synchronizationContext = synchronizationContext2;
	     }
	     ExecutionContext executionContext3 = executionContext2;
	     ExecutionContext executionContext4 = thread2._executionContext;
	     if (executionContext3 != executionContext4)
	     {
		 ExecutionContext.RestoreChangedContextToThread(thread2, executionContext3, executionContext4);
	     }
	}
}

在這裏總結下:

  • 創建狀態機的Create函數通過SynchronizationContext.Current獲取到當前同步執行上下文
  • 啟動狀態機的Start函數之後通過MoveNext函數執行我們的異步方法
  • 這裏還有一個小提示,不管async函數裏面有沒有await,都會生成狀態機,只是MoveNext函數執行同步方法,因此沒await的情況下避免將函數標記為async,會損耗性能

同樣的這裏貌似沒能獲取到原因,但是有個很關鍵的地方,就是Create函數為啥要獲取當前同步執行上下文,之後我從MSDN找到關於SynchronizationContext
的介紹,有興趣的朋友可以去閱讀以下,以下是各個.NET框架使用的SynchronizationContext:

SynchronizationContext 默認
WindowsFormsSynchronizationContext WindowsForm
DispatcherSynchronizationContext WPF/Silverlight
AspNetSynchronizationContext ASP.NET

我們貌似已經一步步接近真相了,接下來我們來看看DispatcherSynchronizationContext

二.DispatcherSynchronizationContext

首先來看看DispatcherSynchronizationContext類的比較關鍵的幾個函數實現:

public DispatcherSynchronizationContext(Dispatcher dispatcher, DispatcherPriority priority)
{
     if (dispatcher == null)
     {
         throw new ArgumentNullException("dispatcher");
     }
     Dispatcher.ValidatePriority(priority, "priority");
     _dispatcher = dispatcher;
     _priority = priority;
     SetWaitNotificationRequired();
 }

//同步執行
public override void Send(SendOrPostCallback d, object state)
{
     if (BaseCompatibilityPreferences.GetInlineDispatcherSynchronizationContextSend() && _dispatcher.CheckAccess())
     {
         _dispatcher.Invoke(DispatcherPriority.Send, d, state);
     }
     else
     {
          _dispatcher.Invoke(_priority, d, state);
     }
}

//異步執行
public override void Post(SendOrPostCallback d, object state)
{
     _dispatcher.BeginInvoke(_priority, d, state);
}

我們貌似看到了熟悉的東西了,Send函數調用Dispatcher的Invoke函數,Post函數調用Dispatcher的BeginInvoke函數,那麼是否WPF執行異步函數之後會調用這裏的函數嗎?我用dnspy進行了調試:

我通過調試之後發現,當等待執行完整個狀態機的之後,也就是兩秒后跳轉到該Post函數,那麼,我們可以將之前的WPF那段代碼大概可以改寫成如此:

private async void Async_Click(object sender, RoutedEventArgs e)
{
    //async生成狀態機的Create函數。獲取到UI主線程的同步執行上下文
    DispatcherSynchronizationContext synchronizationContext = (DispatcherSynchronizationContext)SynchronizationContext.Current;
    
    //UI主線程執行
    Debug.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
    
    //開始在狀態機的MoveNext執行該異步操作
    var result= await ExampleTask(2);
    
    //等待兩秒,異步執行完成,再在同步上下文異步執行
    synchronizationContext.Post((state) =>
    {
         //模仿_dispatcher.BeginInvoke
         Debug.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
         Debug.WriteLine(result);
         Debug.WriteLine("Async Completed");  
     },"Post");           
 }

輸出結果:

Thread Id is Thread:1,Is Thread Pool:False
Thread Id is Thread:1,Is Thread Pool:False
It's Async Completed in 2 seconds
Async Completed

也就是asyn負責生成狀態機和執行狀態機,await將代碼分為兩部分,一部分是異步執行狀態機部分,一部分是異步執行完之後,通過之前拿到的DispatcherSynchronizationContext,再去異步執行接下來的部分。我們可以通過dnspy調試DispatcherSynchronizationContext的 _dispatcher字段的Thread屬性,知道Thread為UI主線程,而同步界面UI控件的時候,也就是通過Dispatcher的BeginInvoke函數去執行同步的

三.Task.ConfigureAwait

Task有個ConfigureAwait方法,是可以設置是否對Task的awaiter的延續任務執行原始上下文,也就是為true時,是以一開始那個UI主線程的DispatcherSynchronizationContext執行Post方法,而為false,則以await那個Task裏面的DispatcherSynchronizationContext執行Post方法,我們來驗證下:

我們將代碼改為以下:

private async void Async_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
    var result= await ExampleTask(2).ConfigureAwait(false);
    Debug.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
    Debug.WriteLine(result);
    Debug.WriteLine($"Async Completed");
}

輸出:

Thread Id is Thread:1,Is Thread Pool:False
Thread Id is Thread:4,Is Thread Pool:True
It's Async Completed in 2 seconds
Async Completed

結果和控制台輸出的一模一樣,且通過dnspy斷點調試依舊進入到DispatcherSynchronizationContext的Post方法,因此我們也可以證明我們上面的猜想,而且默認ConfigureAwait的參數是為true的,我們還可以將異步結果賦值給UI界面的Text block:

private async void Async_Click(object sender, RoutedEventArgs e)
{
    Debug.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
    var result= await ExampleTask(2).ConfigureAwait(false);
    Debug.WriteLine($"Thread Id is Thread:{Thread.CurrentThread.ManagedThreadId},Is Thread Pool:{Thread.CurrentThread.IsThreadPoolThread}");
    this.txt.Text = result;//修改部分
    Debug.WriteLine($"Async Completed");
}

拋出異常:

調用線程無法訪問此對象,因為另一個線程擁有該對象

補充
推薦林大佬的一篇文章,也講的也簡潔透徹C# dotnet 自己實現一個線程同步上下文

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

※回頭車貨運收費標準