소소한 일상과 잡다한 정보

IT/Dart

Dart_6일차 : 고급 Stream 활용 (listen(), StreamController, Broadcast Stream)

pandada 2025. 3. 4. 22:11
반응형

 이번에는 Stream을 더 깊이 있게 활용하는 방법을 확인해보자.

 특히 listen(), StreamController, Broadcast Stream을 사용할 수 있게 되면 실시간 데이터 처리에 더 강력한 기능을 사용할 수 있다.

1. Stream의 기본 동작 복습 (await for)

 ✔️ await for를 사용한 기본 Stream

Stream<int> countStream(int maxCount) async* {
  for (int i = 1; i <= maxCount; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
}

void main() async {
  print("⏳ 카운트 시작...");
  await for (var value in countStream(5)) {
    print("⏰ $value");
  }
  print("✅ 카운트 완료!");
}
  • yield를 사용해 값을 하나씩 방출
  • await for을 사용해 Stream을 반복문처럼 활용

 

2.  listen()을 사용한 Stream 구독

 await for 없이 Stream을 구독하고 데이터가 올 때마다 실행하는 방법이 listen()이다.

 ✔️ listen()을 사용한 Stream 구독

 Stream을 listen()으로 구독할 수 있음 / 데이터가 도착할 때마다 (value) { print(value); } 실행 / Stream이 끝나면 onDone 콜백 실행

Stream<int> countStream(int maxCount) async* {
  for (int i = 1; i <= maxCount; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
}

void main() {
  print("⏳ 카운트 시작...");

  countStream(5).listen((value) { // listen()을 사용해 Stream 구독
    print("⏰ $value");
  }, onDone: () { // Stream이 끝나면 실행
    print("✅ 카운트 완료!");
  });
}

 

3.  StreamController를 사용한 실시간 데이터 처리

 기존 방식(async* yield) 대신 StreamController를 사용하면 실시간으로 데이터를 추가하고 관리할 수 있다.

 예를 들어, 버튼 클릭 이벤트, 채팅 메시지, 센서 데이터 처리 같은 기능을 만들 수 있다.

✔️ StreamController를 사용한 데이터 스트림

import 'dart:async';

void main() {
  //StreamController<int>를 생성 → int 타입의 데이터 스트림을 관리
  StreamController<int> controller = StreamController<int>();
  //이제 controller.add(value)를 사용해서 원하는 시점에 데이터를 보낼 수 있음

  // 스트림을 listen()으로 구독 → 새로운 데이터가 올 때마다 실행됨
  controller.stream.listen((value) {
    print("📩 받은 데이터: $value");
  }, onDone: () { //onDone:을 사용하면 스트림이 종료될 때 실행할 코드를 추가 가능
    print("✅ 데이터 스트림 종료!");
  });

  // 1초마다 데이터 추가 (비동기)
  Future.delayed(Duration(seconds: 1), () => controller.add(1));
  Future.delayed(Duration(seconds: 2), () => controller.add(2));
  Future.delayed(Duration(seconds: 3), () => controller.add(3));
  // 1초 후 1, 2초 후 2, 3초 후 3을 스트림에 추가
  // 구독한 listen()에서 값을 받을 때마다 📩 받은 데이터: X가 출력됨


  // 4초 후 스트림 종료
  Future.delayed(Duration(seconds: 4), () => controller.close());
  // 스트림이 종료되면 onDone:에 설정된 "✅ 데이터 스트림 종료!"가 실행됨
}
  • 데이터 스트림 생성 → StreamController<T> 객체를 생성
  • StreamController<int>를 생성하여 직접 데이터 추가 (controller.add(value))
  • listen()을 사용해 데이터를 받을 때마다 실행
  • controller.close()를 사용해 Stream을 종료할 수 있음

 

✔️ StreamController의 활용 예시 ( 사용자 입력을 실시간으로 스트림에 추가하는 경우 )

import 'dart:async';

void main() {
  //StreamController<int>를 생성 → int 타입의 데이터 스트림을 관리
  StreamController<int> controller = StreamController<int>();
  //이제 controller.add(value)를 사용해서 원하는 시점에 데이터를 보낼 수 있음

  // 스트림을 listen()으로 구독 → 새로운 데이터가 올 때마다 실행됨
  controller.stream.listen((value) {
    print("📩 받은 데이터: $value");
  }, onDone: () { //onDone:을 사용하면 스트림이 종료될 때 실행할 코드를 추가 가능
    print("✅ 데이터 스트림 종료!");
  });

  // 1초마다 데이터 추가 (비동기)
  Future.delayed(Duration(seconds: 1), () => controller.add(1));
  Future.delayed(Duration(seconds: 2), () => controller.add(2));
  Future.delayed(Duration(seconds: 3), () => controller.add(3));
  // 1초 후 1, 2초 후 2, 3초 후 3을 스트림에 추가
  // 구독한 listen()에서 값을 받을 때마다 📩 받은 데이터: X가 출력됨


  // 4초 후 스트림 종료
  Future.delayed(Duration(seconds: 4), () => controller.close());
  // 스트림이 종료되면 onDone:에 설정된 "✅ 데이터 스트림 종료!"가 실행됨
}
  • 사용자가 입력할 때마다 StreamController를 통해 데이터를 추가(add())
  • 입력된 데이터가 실시간으로 listen()을 통해 출력됨
  • 사용자가 "exit"을 입력하면 close()를 호출하여 스트림을 종료

 

✔️ StreamController를 사용하는 이유

  • 비동기적으로 데이터를 추가하고 관리 가능 (add(value))
  • 이벤트 기반 데이터 처리가 가능 (사용자 입력, 센서 데이터, 버튼 클릭 등)
  • 일반적인 async*, yield보다 더 유연한 데이터 흐름 제어 가능
  • 스트림을 필요할 때 닫을 수 있음 (close())
  • StreamController를 사용하면 스트림을 더 유연하게 관리 가능
  • add(value)를 사용해서 필요한 시점에 데이터를 추가 가능
  • listen()을 사용해서 데이터를 받을 때마다 처리 가능


4. Broadcast Stream (멀티 리스너 지원)

 기본적으로 Stream 한 번만 구독할 수 있는 Single Subscription Stream이지만, broadcast Stream을 사용하면 여러 개의 listen()을 동시에 실행할 수 있다.

✔️ Broadcast Stream을 사용하여 여러 리스너 구독

import 'dart:async';

void main() {
  StreamController<int> controller = StreamController<int>.broadcast(); // broadcast Stream

  // 첫 번째 리스너
  controller.stream.listen((value) {
    print("👤 Listener 1: $value");
  });

  // 두 번째 리스너
  controller.stream.listen((value) {
    print("👥 Listener 2: $value");
  });

  // 1초마다 데이터 추가
  Future.delayed(Duration(seconds: 1), () => controller.add(10));
  Future.delayed(Duration(seconds: 2), () => controller.add(20));
  Future.delayed(Duration(seconds: 3), () => controller.add(30));

  // 4초 후 스트림 종료
  Future.delayed(Duration(seconds: 4), () => controller.close());
}
  • StreamController.broadcast()를 사용하면 여러 개의 listen() 실행 가능
  • Listener 1과 Listener 2가 같은 데이터 스트림을 동시에 받을 수 있음
  • 멀티플레이 게임의 실시간 채팅, 주식 가격 업데이트 같은 기능에 활용 가능

 

 📌 기본 StreamController와 Broadcast Stream 차이

  기본 StreamController StreamController.broadcast()
구독 가능 리스너 수 1개 (Single Subscription Stream) 여러 개 가능 (Broadcast Stream)
동작 방식 한 번만 listen() 가능 여러 개의 listen()에서 동시에 구독 가능
사용 예시 파일 다운로드, HTTP 요청 처리 실시간 채팅, 주식 가격 업데이트, 센서 데이터

 ✔️ 일반 StreamController는 하나의 listen()만 가능

import 'dart:async';

void main() {
  StreamController<int> controller = StreamController<int>(); // 기본 StreamController

  // 첫 번째 리스너
  controller.stream.listen((value) {
    print("👤 Listener 1: $value");
  });

  // 두 번째 리스너 추가하면 오류 발생 ❌
  controller.stream.listen((value) { print("👥 Listener 2: $value"); });

  controller.add(10);
  controller.add(20);
  controller.close();
}

 

< 에러 내용 >

  • 에러 내용을 살펴보면 Stream은 이미 listen 하고 있다고 표기가 된다.
  • 이미 첫 번째 리스너를 사용하고 있기 때문에 일반 Stream은 안된다는 이유이다.

📌 결론 : Broadcast Stream을 사용하면 좋은 경우

  • 기본 StreamController는 하나의 listen()만 가능
  • StreamController.broadcast()를 사용하면 여러 개의 listen() 가능
  • broadcast Stream은 실시간 데이터 공유가 필요한 경우 유용함
  • 실시간 채팅으로 여러 사용자가 동시에 같은 메시지를 받을 때 / 여러 클라이언트가 같은 데이터를 구독할 때
  • 여러 시스템이 동일한 센서 데이터를 받아야 할 때 / 한 이벤트를 여러 구독자가 동시에 받아야 할 때 등 같은 내용을 동시 공유할 때 사용
반응형

✔️ 테스트 예제 코드 1

 위에서 진행했던 내용을 토대로 충분한 실습을 진행한 후, 테스트 예제를 진행해보도록 하자.

 1. listen()을 사용하여 사용자 입력을 실시간으로 출력하는 프로그램

 2. stdin.listen()을 사용하여 입력 즉시 실행

 3. utf8.decode()를 사용하여 문자열로 변환

import 'dart:async';
import 'dart:io';
import 'dart:convert'; // utf8.decode()를 사용하기 위해 필요

void main() {
  StreamController<String> controller = StreamController<String>(); // StreamController 생성

  // Stream을 listen()으로 구독 (입력 즉시 실행)
  controller.stream.listen((input) {
    print("📩 입력된 데이터: $input");
  }, onDone: () {
    print("✅ 스트림 종료!");
  });

  // stdin.listen()을 사용하여 사용자 입력을 실시간으로 추가
  stdin.listen((inputData) {
    String userInput = utf8.decode(inputData).trim(); // 바이트 데이터를 문자열로 변환

    if (userInput.toLowerCase() == "exit") {
      controller.close();
      print("🚪 프로그램을 종료합니다.");
      exit(0); // 프로그램 종료
    }

    controller.add(userInput); // 입력된 데이터를 즉시 스트림에 추가
  });
}

 

 

✔️ 테스트 예제 코드 2

 1. StreamController를 사용하여 사용자가 입력한 숫자를 받아 스트림으로 처리하는 프로그램

 2. 사용자가 숫자를 입력하면 StreamController를 통해 실시간으로 전달

 3. 숫자가 입력될 때마다 listen()을 사용해 출력

 4. 사용자가 "exit"을 입력하면 스트림 종료 및 프로그램 종료

import 'dart:async';
import 'dart:io';
import 'dart:convert'; // utf8.decode() 사용을 위해 필요

void main() {
  StreamController<int> controller = StreamController<int>(); // StreamController 생성

  // Stream을 listen()으로 구독 (숫자 입력 시 출력)
  controller.stream.listen((number) {
    print("📩 입력된 숫자: $number");
  }, onDone: () {
    print("✅ 스트림 종료!");
  });

  // stdin.listen()을 사용하여 사용자 입력을 실시간으로 추가
  stdin.listen((inputData) {
    String userInput = utf8.decode(inputData).trim(); // 입력값을 문자열로 변환

    if (userInput.toLowerCase() == "exit") {
      controller.close();
      print("🚪 프로그램을 종료합니다.");
      exit(0); // 프로그램 종료
    }

    int? number = int.tryParse(userInput); // 문자열을 정수로 변환
    if (number == null) {
      print("⚠️ 숫자를 입력하세요.");
    } else {
      controller.add(number); // 숫자를 스트림에 추가
    }
  });
}
  • StreamController<int> 생성 → 숫자를 처리하는 스트림을 관리
  • listen()을 사용하여 숫자가 입력될 때마다 출력
  • stdin.listen()을 사용하여 사용자가 입력한 데이터를 실시간으로 읽음
  • 입력값을 int.tryParse()로 숫자로 변환하고, 변환에 실패하면 오류 메시지 출력
  • 사용자가 "exit"을 입력하면 스트림을 종료하고 프로그램을 종료

 


 이렇게 고급 Stream 활용 (listen(), StreamController, Broadcast Stream)에 대해서 알아보았다. 추가 적인 내용이 필요한 경우에는 댓글을 요청드리고, 틀린 부분이 있다면 이것 또한 댓글로 알려주시면 수정하도록 하겠습니다!


 

반응형