returning subscriber in RxJava after storing data fetch from webservice


returning subscriber in RxJava after storing data fetch from webservice



I am trying to call the web service to fetch the data and storing it into database using following code. I have created a separate class to perform following operation.



Now , the issue is i want to notify my activity when i successfully fetch and store data in database. if some error occurs then i want to show that on UI itself.



somehow i am able to write a code to fetch the data using pagination but not sure how would i notify UI where i can subscribe catch the update related to progress and error if any.


public Flowable<Response> getFitnessData() {

Request request = new Request();
request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

Flowable<Response> fitnessFlowable = new WebRequest()
.getRemoteClient()
.create(FitnessApi.class)
.getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


fitnessFlowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.takeUntil(response->response.getSummary().getNext()!=null)

.subscribe(new Subscriber<Response>() {
@Override
public void onSubscribe(Subscription s) {

s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Response response) {

Log.e(TAG, "onNext" );

if(response !=null){

if(response.getFitness()!=null && response.getFitness().size()!=0){

Realm realm = Realm.getDefaultInstance();
realm.executeTransactionAsync(new Realm.Transaction() {
@Override
public void execute(Realm realm) {

realm.copyToRealmOrUpdate(response.getFitness());

}
}, new Realm.Transaction.OnSuccess() {
@Override
public void onSuccess() {

Log.i(TAG, "onSuccess , fitness data saved");

}
}, new Realm.Transaction.OnError() {
@Override
public void onError(Throwable error) {
Log.i(TAG, "onError , fitness data failed to save"+error.getMessage() );
}
});
}else{

Log.i(TAG, "onError , no fitness data available" );


}

}else{
Log.i(TAG, "onError , response is null" );

}
}

@Override
public void onError(Throwable t) {


Log.e(TAG, "onError" +t.getMessage());
}

@Override
public void onComplete() {

Log.e(TAG, "onComplete");
}
});;

return null;

}



Updated



Created RxBus to propagate events and error on UI


RxBus


public class RxBus {

private static final RxBus INSTANCE = new RxBus();

private RxBus(){}
private PublishSubject<Object> bus = PublishSubject.create();

public static RxBus getInstance() {
return INSTANCE;
}


public void send(Object o) {
bus.onNext(o);
}

public void error(Throwable e){bus.onError(e);}

public Observable<Object> toObservable() {
return bus;
}
}



in activity


FitnessRepo fitnessRepo = new FitnessRepo();
fitnessRepo.getFitnessData();
RxBus.getInstance().toObservable().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Object o) {

if(o instanceof RealmList ){

RealmList<Fitness> realmList = (RealmList<Fitness>) o;
Log.e(TAG,"Fitness data size "+realmList.size());

}
}

@Override
public void onError(Throwable e) {

Log.e(TAG,e.getMessage()+ "");

if (e instanceof HttpException) {
ResponseBody body = ((HttpException) e).response().errorBody();


try {

Response response= LoganSquare.parse(body.byteStream(),Response.class);

if(response.getErrors() !=null)
if(response.getErrors().size() > 0)
Log.e(TAG, "Error "+response.getErrors().get(0).getErrors());
} catch (IOException t) {
t.printStackTrace();
}

}
}

@Override
public void onComplete() {

}
});



in a web service call


public void getFitnessData() {


Request request = new Request();
request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");
request.setEnd_date("2018-07-01T00:00:00");
Flowable<Response> fitnessFlowable = new WebRequest()
.getRemoteClient()
.create(FitnessApi.class)
.getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


fitnessFlowable.subscribeOn(Schedulers.io())
.takeUntil(response->response.getSummary().getNext()!=null)
.doOnNext((response) -> {
if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) {


Log.e(TAG, " Error ");
return;
}

RxBus.getInstance().send(response.getFitness());

try(Realm r = Realm.getDefaultInstance()) {
r.executeTransaction((realm) -> {
realm.copyToRealmOrUpdate(response.getFitness());
});
}
}).subscribe(item ->{


},
error ->{

RxBus.getInstance().error(error);


});
}




2 Answers
2



I have good news for you! You can delete almost all of that code and just make it generally better as a result!


public void fetchFitnessData() {

Request request = new Request();
request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

Flowable<Response> fitnessFlowable = new WebRequest()
.getRemoteClient()
.create(FitnessApi.class)
.getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


fitnessFlowable.subscribeOn(Schedulers.io())
.takeUntil(response->response.getSummary().getNext()!=null)
.doOnNext((response) -> {
if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) return;

try(Realm r = Realm.getDefaultInstance()) {
r.executeTransaction((realm) -> {
realm.insertOrUpdate(response.getFitness());
});
}
}
}).subscribe();
}



This method is on a background thread now and returns void, so the way to emit stuff out of this method would be to use either a PublishSubject (one for success, one for failure) or an EventBus.


void


PublishSubject


EventBus


private PublishSubject<Object> fitnessResults;
public Observable<Object> observeFitnessResults() {
return fitnessResults;
}

public static class Success {
public Success(List<Fitness> data) {
this.data = data;
}

public List<Fitness> data;
}

public static class Failure {
public Failure(Exception exception) {
this.exception = exception;
}

public Exception exception;
}

public void fetchFitnessData() {
...
fitnessResults.onNext(new Success(data));
} catch(Exception e) {
fitnessResults.onNext(new Failure(e));



And then


errors = observeFitnessResults().ofType(Error.class);
success = observeFitnessResults().ofType(Success.class);





to start fitnessFlowable , i had to write s.request in subscribe rite ? and i am not sure how would i use PublishSubject here
– Hunt
Jun 28 at 10:20



s.request


PublishSubject





Can you tell me how can i use PublishSubject here ?
– Hunt
Jun 29 at 8:58


PublishSubject





i have made changes based on PublishSubject , can you see and verify my edits ?
– Hunt
Jul 2 at 10:44


PublishSubject





in the code that you have suggested has .subscribe(); function called in the last. As we are not disposing it , i am getting lint error i.e. the result of subscribe is not used , can you tell me how can we get rid off this warning
– Hunt
Jul 3 at 9:06


.subscribe();


lint


the result of subscribe



There are different ways to achieve this. I will never handle the subscriptions on my own out of a lifecycle scope as it creates a possibility of memory leak. In your case it seems that both success and failure is bound to the UI so you can simply do this.


public Completable fetchFitnessData() {

Request request = new Request();
request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

Flowable<Response> fitnessFlowable = new WebRequest()
.getRemoteClient()
.create(FitnessApi.class)
.getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


return fitnessFlowable.subscribeOn(Schedulers.io())
.takeUntil(response->response.getSummary().getNext()!=null)
.doOnNext((response) -> {
if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) return;

try(Realm r = Realm.getDefaultInstance()) {
r.executeTransaction((realm) -> {
realm.insertOrUpdate(response.getFitness());
});
}
}
}).ignoreElements();



}



At UI level, you can just handle your subscription with both success and failure. In case you need success model can replace Completable with Single or Flowable.


fetchFitnessData.subscrible(Functions.EMPTY_ACTION, Timber::d);



The major advantage with this approach is that you handle your subscription lifecycles.





how would i initiate flowable as it requires s.request() to initiate it
– Hunt
Jun 28 at 10:51


s.request()





sorry I didn't get your question? where is s.request()?
– Yogesh Madaan
Jun 28 at 23:33





To initiate flowable one has to use request function in OnSubscribe
– Hunt
Jun 29 at 1:38


request


OnSubscribe





This thing is a little new to me. But I just checked. you can add doOnSubscribe add call this request() there
– Yogesh Madaan
Jun 29 at 3:41






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Rothschild family

Cinema of Italy