ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。 实时数据处理是一件普通的现象,有一个高效、干净和可扩展的方式来处理这些情景是重要的。使用 Observables 和 Operators 来熟练操作它们。ReactiveX 提供一个可组合又灵活的 API 来创建和处理数据流,同时简化了异步编程带来的一些担忧,如:线程创建和并发问题。
RxJava是ReactiveX在Java上的开源的实现。Observable(观察者)和Subscriber(订阅者)是两个主要的类。在RxJava上,一个Observable是一个发出数据流或者事件的类,Subscriber是一个对这些发出的items(数据流或者事件)进行处理(采取行动)的类。一个Observable的标准流发出一个或多个item,然后成功完成或者出错。一个Observable可以有多个Subscribers,并且通过Observable发出的每一个item,该item将会被发送到Subscriber.onNext()方法来进行处理。一旦Observable不再发出items,它将会调用Subscriber.onCompleted()方法,或如果有一个出错的话Observable会调用Subscriber.onError()方法。现在,我们知道了很多关于Observable和Subscriber类,我们可以继续去介绍有关Observables 的创建和订阅。
Observable integerObservable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
});这个 Observable 发出了整数 1,2,3 然后结束了。现在我们需要创建一个 Subscriber,那样我们就能让这些发出的流起作用。
Subscriber integerSubscriber = new Subscriber() {
@Override
public void onCompleted() {
System.out.println("Complete!");
}
@Override
public void onError(Throwable e) { }
@Override
public void onNext(Integer value) {
System.out.println("onNext: " + value);
}
};我们的 Subscriber 只是简单的把任何发出的 items 打印出来,完成之后通知我们。一旦你有一个 Observable 和一个 Subscriber,可以通过 Observable.subscribe() 方法将他们联系起来。
integerObservable.subscribe(integerSubscriber); // Outputs: // onNext: 1 // onNext: 2 // onNext: 3 // Complete!
上面所有这些代码可以简单的通过使用 Observable.just() 方法来创建一个 Observable 去发出这些定义的值,并且我们的 Subscriber 可以改变成匿名的内部类,如下:
Observable.just(1, 2 ,3).subscribe(new Subscriber() {
@Override
public void onCompleted() {
System.out.println("Complete!");
}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(Integer value) {
System.out.println("onNext: " + value);
}
});RxJava地址:https://github.com/ReactiveX/RxJava
RxAndroid地址:https://github.com/ReactiveX/RxAndroid
通过请求openweathermap 的天气查询接口返回天气数据
1、增加编译依赖
dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
compile 'com.android.support:appcompat-v7:22.0.0'
compile 'io.reactivex:rxjava:1.0.9'
compile 'io.reactivex:rxandroid:0.24.0'
compile 'com.squareup.retrofit:retrofit:1.9.0'
}retrofit 是一个 restful 请求客户端。详见:https://square.github.io/retrofit/
2、服务器接口
/**
* 接口
* Created by Hal on 15/4/26.
*/
public class ApiManager {
private static final String ENDPOINT = "https://api.openweathermap.org/data/2.5";
/**
* 服务接口
*/
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
}
private static final RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint(ENDPOINT).setLogLevel(RestAdapter.LogLevel.FULL).build();
private static final ApiManagerService apiManager =
restAdapter.create(ApiManagerService.class);
/**
* 将服务接口返回的数据,封装成{@link rx.Observable}
* @param city
* @return
*/
public static Observable<WeatherData> getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribe<WeatherData>() {
@Override
public void call(Subscriber<? super WeatherData> subscriber) {
// 订阅者回调 onNext 和 onCompleted
subscriber.onNext(apiManager.getWeather(city, "metric"));
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
}订阅者的回调有三个方法,onNext,onError,onCompleted
3、接口调用
/**
* 多个 city 请求
* map,flatMap 对 Observable进行变换
*/
Observable.from(CITIES).flatMap(new Func1<String, Observable<WeatherData>>() {
@Override
public Observable<WeatherData> call(String s) {
return ApiManager.getWeatherData(s);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<WeatherData>() { // onNext
@Override
public void call(WeatherData weatherData) {
Log.d(LOG_TAG, weatherData.toString());
}
}, new Action1<Throwable>() { // onError
@Override
public void call(Throwable throwable) {}
});
/**
* 单个 city 请求
*/
ApiManager.getWeatherData(CITIES[0]).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<WeatherData>() {
@Override
public void call(WeatherData weatherData) {
Log.d(LOG_TAG, weatherData.toString());
((TextView) findViewById(R.id.text)).setText(weatherData.toString());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(LOG_TAG, throwable.getMessage(), throwable);
}
});
/**
* Android View 事件处理
*/
ViewObservable.clicks(findViewById(R.id.text), false).subscribe(new Action1<OnClickEvent>() {
@Override
public void call(OnClickEvent onClickEvent) {}
});subscribeOn(Schedulers.io())与observeOn(AndroidSchedulers.mainThread())分别定义了这两个动作的线程。Android UI 更新需要在主线程。
4、retrofit 支持 rxjava 整合
/**
* 服务接口
*/
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
/**
* retrofit 支持 rxjava 整合
* 这种方法适用于新接口
*/
@GET("/weather")
Observable<WeatherData> getWeatherData(@Query("q") String place, @Query("units") String units);
}