본문 바로가기
Discord/Discord Bot Python

디스코드 유튜브 음악 재생 봇 - 루프(반복 재생) 기능

by 깐테 2025. 2. 5.

2024.12.16 - [Discord/Discord Bot Python] - [Discord Bot Python] 디스코드 유튜브 음악 재생 봇 - 플레이리스트

 

[Discord Bot Python] 디스코드 유튜브 음악 재생 봇 - 플레이리스트

2023.05.18 - [Discord/Discord Bot Python] - Discord Bot 만들기 - 유튜브 음악 재생 봇\ Discord Bot 만들기 - 유튜브 음악 재생 봇Discord Bot 만들기 - 음성 채널 입장하기 Discord Bot 만들기 - 음성 채널 입장하기디

kante-kante.tistory.com

 

이전에 만들었던 플레이리스트 기능에 이어 간단한 루프 기능을 만들어보도록 한다.

 


 

이번 포스트에서는 기존 유튜브 음악 재생 봇에 반복 재생 기능을 추가하여 사용자가 대기열에 추가한 음악을 반복 재생하는 기능을 만들어보려 한다.

 

여기서 소개할 반복 재생 기능에는 한 곡 반복 재생 기능과 전체 반복 재생 기능이 있는데, 해당 기능들을 간단하게 만들어서 작성해보려 한다.

 

설치에 필요한 라이브러리는 위 플레이 리스트 기능을 설명한 포스팅과 동일하므로 해당 포스트를 참조하면 된다.

 

 

 

코드 설명

해당 코드에서 커맨드 함수를 수정해야 하기 때문에 코드 순서가 뒤죽박죽인 부분 참조.

class Music(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.queue = []
        self.current_player = None
        self.max_queue_size = 10  # 대기열 최대 길이 설정
        self.loop = False
        self.loop_current = False

 

전체 곡을 반복 재생하는데 설정할 loop 변수와 loop_current 변수를 만들어 초기화한다.

 

 

@commands.command(aliases=['반복'])
async def loop(self, ctx):
    """전체 곡을 반복 재생합니다."""
    if not ctx.voice_client:
        await ctx.send("음성 채널에 연결되어 있지 않습니다.")
        return
        
    self.loop = not self.loop
    self.loop_current = False # 한곡 반복과 전체 반복 구분

    if self.loop:
        await ctx.send("전체 곡 반복이 활성화되었습니다.")
    else:
        await ctx.send("전체 곡 반복이 비활성화되었습니다.")

@commands.command(aliases=['한곡반복','현재곡반복'])
async def loop_one(self, ctx):
    """현재 재생 중인 곡을 반복 재생합니다."""
    if not ctx.voice_client or not ctx.voice_client.is_playing():
        await ctx.send("현재 재생 중인 곡이 없거나 음성 채널에 연결되어 있지 않습니다.")
        return
        
    self.loop_current = not self.loop_current
    self.loop = False # 한곡 반복과 전체 반복 구분

    if self.loop_current:
        await ctx.send("현재 곡 반복이 활성화되었습니다.")
    else:
        await ctx.send("현재 곡 반복이 비활성화되었습니다.")

 

새로운 커맨드 loop와 loop_one 커맨드를 만들어 주었다.

 

loop

(명령어: !loop, !반복)

 

해당 부분은 곡을 전체 반복하는 데 사용하는 코드. 현재 곡 반복과 전체 곡 반복은 동시에 실행할 수 없기 때문에, 두 기능 중 하나만 사용하도록 처리했다.

 

 

loop_one

(명령어: !loop_one, !한곡반복, !현재곡반복)

 

현재 재생 중인 곡을 반복하는 데 사용하는 코드. 현재 곡 반복 모드를 켜면 대기열에 곡이 있어도 현재 재생중인 곡만 반복 재생한다.

 

 

@commands.command(aliases=["다음"])
async def skip(self, ctx):
    """대기열에서 다음 곡을 재생합니다"""
    if not ctx.voice_client:
        await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
        return
 
    if not self.queue:  # 대기열이 비어있는지 확인
        await ctx.send("다음 재생할 곡이 대기열에 없습니다.\n음악을 계속 재생하시려면 음악을 추가해주세요.")
        return
        
    # 반복 모드에서 스킵 제한.
    if self.loop or self.loop_current:
        await ctx.send("반복 모드를 종료하고 skip 명령어를 입력해주세요.")
        return
 
    # 현재 재생중인 음악 중지
    if ctx.voice_client.is_playing():
        ctx.voice_client.stop()
 
    try: 
        current_url, current_title = self.queue.pop(0)  # 바로 대기열에서 제거
 
        async with ctx.typing():
            self.current_player = await YTDLSource.from_url(current_url, loop=self.bot.loop, stream=True)
 
            ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
 
            await ctx.send(f'지금 재생 중: {self.current_player.title}')
 
    except Exception as e:
        await ctx.send(f"재생 중 오류가 발생했습니다: {str(e)}")
        print(f"재생 오류: {e}")
        # 오류 발생 시 대기열에 기존 재생 곡 추가
        if current_url and current_title:
            self.queue.insert(0, (current_url, current_title))

 

Skip

skip 부분은 기존과 거의 비슷하지만 약간 수정해주었다.

 

현재 전체 작성된 코드가 상태를 저장하는 데 좋지 않기 때문에 스킵 기능은 루프 기능을 꺼둔 상태에서만 실행하도록 일단 처리해두었다. 추후 클래스를 분리하여 재생 가능하도록 처리가 필요할 듯 싶다.

 

 

 

async def play_next(self, ctx):
    if not ctx.voice_client:
        return

    try:
        if self.loop_current and self.current_player:
            # 현재 곡 반복 재생 처리
            current_url = self.current_player.data['url']
            self.current_player = await YTDLSource.from_url(current_url, loop=self.bot.loop, stream=True)
            ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
            return

        if self.loop and self.current_player:
            # 전체 곡 반복 재생 처리
            current_url = self.current_player.data['url']
            current_title = self.current_player.title
            self.queue.append((current_url, current_title))

        if len(self.queue) > 0:
            # 다음 곡 재생
            next_url, next_title = self.queue.pop(0)
            self.current_player = await YTDLSource.from_url(next_url, loop=self.bot.loop, stream=True)
            ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))

            # play_next에서 호출될 때는 메시지를 보내지 않음
            if ctx.command and ctx.command.name == "skip":
                await ctx.send(f'지금 재생 중: {self.current_player.title}')

    except Exception as e:
        await ctx.send(f"재생 중 오류가 발생했습니다: {str(e)}")
        print(f"재생 오류: {e}")
        # 오류 발생 시 대기열에 기존 재생 곡 추가
        if current_url and current_title:
            self.queue.insert(0, (current_url, current_title))

 

play_next

해당 부분은 커맨드로 실행되는 부분이 아니고 루프 상태에서 음악 재생을 처리하는 코드다.

if self.loop_current and self.current_player:
    # 현재 곡 반복 재생 처리
    current_url = self.current_player.data['url']
    self.current_player = await YTDLSource.from_url(current_url, loop=self.bot.loop, stream=True)
    ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
    return

 

현재 곡을 반복 재생하는 경우 현재 재생중인 곡의 url 정보를 추출하여 플레이어에 현재 곡 정보만 계속 추가하는 방식.

 

if self.loop and self.current_player:
    # 전체 곡 반복 재생 처리
    current_url = self.current_player.data['url']
    current_title = self.current_player.title
    self.queue.append((current_url, current_title))

 

전체 곡을 반복 하는 경우 현재 재생중인 곡이 끝나면 대기열의 가장 마지막에 추가하는 방법을 사용.

이렇게 루프가 켜진 경우 사용자가 추가한 대기열에 있는 곡이 계속 반복 재생된다.

 

 

@commands.command()
async def stop(self, ctx):
    """재생을 멈추고 음성 채널에서 나갑니다"""
    if not ctx.voice_client:
        await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
        return

    self.queue.clear()
    self.loop_current = False
    self.loop = False
    self.current_player = None

    if ctx.voice_client.is_playing():
        ctx.voice_client.stop()
    await ctx.voice_client.disconnect()
    await ctx.send("재생을 멈추고 채널에서 나갔습니다.")

 

stop 커맨드 부분도 수정해준다.

 

stop 명령어 입력 시 봇이 재생하던 음악을 종료하고 음성 채널에서 나가도록 처리해두었기 때문에 기존에 사용했던 반복 기능을 전부 끄고 플레이어 정보도 초기화한다.

 

 

Music 클래스 전체 코드

class Music(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.queue = []
        self.current_player = None
        self.max_queue_size = 10  # 대기열 최대 길이 설정
        self.loop = False
        self.loop_current = False

    @commands.command(aliases=["입장"])
    async def join(self, ctx):
        """음성 채널에 입장합니다"""
        if not ctx.author.voice:
            await ctx.send("음성 채널에 먼저 입장해주세요.")
            return
            
        channel = ctx.author.voice.channel
        
        if ctx.voice_client is not None:
            return await ctx.voice_client.move_to(channel)

        await channel.connect()
        await ctx.send(f"{channel} 채널에 입장했습니다.")
    
    async def play_next(self, ctx):
        if not ctx.voice_client:
            return
        
        try:
            if self.loop_current and self.current_player: 
                current_url = self.current_player.data['url']
                self.current_player = await YTDLSource.from_url(current_url, loop=self.bot.loop, stream=True)
                ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
                return

            if self.loop and self.current_player:
                current_url = self.current_player.data['url']
                current_title = self.current_player.title
                self.queue.append((current_url, current_title))

            if len(self.queue) > 0:
                # 다음 곡 재생
                next_url, next_title = self.queue.pop(0)
                self.current_player = await YTDLSource.from_url(next_url, loop=self.bot.loop, stream=True)
                ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
                
                # play_next에서 호출될 때는 메시지를 보내지 않음
                if ctx.command and ctx.command.name == "skip":
                    await ctx.send(f'지금 재생 중: {self.current_player.title}')
                    
        except Exception as e:
            await ctx.send(f"재생 중 오류가 발생했습니다: {str(e)}")
            print(f"재생 오류: {e}")
            # 오류 발생 시 대기열에 기존 재생 곡 추가
            if current_url and current_title:
                self.queue.insert(0, (current_url, current_title))
    
    @commands.command(aliases=["다음"])
    async def skip(self, ctx):
        """대기열에서 다음 곡을 재생합니다"""
        if not ctx.voice_client:
            await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
            return
            
        if not self.queue:  # 대기열이 비어있는지 확인
            await ctx.send("다음 재생할 곡이 대기열에 없습니다.\n음악을 계속 재생하시려면 음악을 추가해주세요.")
            return
            
        # 반복 모드에서 스킵 제한.
        if self.loop or self.loop_current:
            await ctx.send("반복 모드를 종료하고 skip 명령어를 입력해주세요.")
            return

        try:
            # 현재 재생중인 음악 중지
            if ctx.voice_client.is_playing():
                ctx.voice_client.stop()
                
            next_url, next_title = self.queue.pop(0)
            self.current_player = await YTDLSource.from_url(next_url, loop=self.bot.loop, stream=True)

            ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
            await ctx.send(f'지금 재생 중: {self.current_player.title}')
        except Exception as e:
            await ctx.send(f"재생 중 오류가 발생했습니다: {str(e)}")
            print(f"재생 오류: {e}")
            # 오류 발생 시 대기열에 기존 재생 곡 추가
            if next_url and next_title:
                self.queue.insert(0, (next_url, next_title))
   
    @commands.command(aliases=["재생"])
    async def play(self, ctx, *, url):
        """URL에서 음악을 재생하고 대기열에 추가합니다"""
        
        try:
            if not ctx.author.voice:
                await ctx.send("음성 채널에 먼저 입장해주세요.")
                return

            if not ctx.voice_client:
                await ctx.author.voice.channel.connect()
                
            if len(self.queue) >= self.max_queue_size:
                await ctx.send(f"대기열이 가득 찼습니다. 최대 {self.max_queue_size}곡까지만 추가할 수 있습니다.")
                return
            

            # 추가하려는 곡의 정보를 미리 가져옴
            async with ctx.typing():
                player = await YTDLSource.from_url(url, loop=self.bot.loop, stream=True)

                if not ctx.voice_client.is_playing():
                    self.current_player = player
                    ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
                    await ctx.send(f'지금 재생 중: {self.current_player.title}')
                else:
                    self.queue.append((url, player.title))
                    queue_info = f'대기열에 "{player.title}" 노래가 추가되었습니다.\n현재 대기열 ({len(self.queue)}곡):\n'
                    for i, (_, title) in enumerate(self.queue, 1):
                        queue_info += f"{i}. {title}\n"
                    await ctx.send(queue_info)
                    
        except Exception as e:
            await ctx.send(f"음악을 추가하는 중 오류가 발생했습니다: {str(e)}")
            print(f"재생 오류: {e}")

    @commands.command(aliases=["음량","볼륨","소리"])
    async def volume(self, ctx, volume: int):
        """플레이어의 볼륨을 조절합니다"""

        if ctx.voice_client is None:
            return await ctx.send("음성 채널에 연결되어 있지 않습니다.")

        if not 0 <= volume <= 100:
            return await ctx.send("볼륨은 0에서 100 사이의 값이어야 합니다.")

        ctx.voice_client.source.volume = volume / 100
        await ctx.send(f"볼륨이 {volume}%로 변경되었습니다")

    @commands.command()
    async def stop(self, ctx):
        """재생을 멈추고 음성 채널에서 나갑니다"""
        if not ctx.voice_client:
            await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
            return
            
        self.queue.clear()
        self.loop_current = False
        self.loop = False
        self. current_player = None

        if ctx.voice_client.is_playing():
            ctx.voice_client.stop()
        await ctx.voice_client.disconnect()
        await ctx.send("재생을 멈추고 채널에서 나갔습니다.")
        
    @commands.command()
    async def pause(self, ctx):
        """음악을 일시정지합니다"""
        if not ctx.voice_client:
            await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
            return

        if ctx.voice_client.is_paused() or not ctx.voice_client.is_playing():
            await ctx.send("음악이 이미 일시 정지 중이거나 재생 중이지 않습니다.")
            return
            
        ctx.voice_client.pause()
        await ctx.send("음악이 일시정지되었습니다.")
            
    @commands.command()
    async def resume(self, ctx):
        """일시정지된 음악을 다시 재생합니다"""
        if not ctx.voice_client:
            await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
            return

        if ctx.voice_client.is_playing() or not ctx.voice_client.is_paused():
            await ctx.send("음악이 이미 재생 중이거나 재생할 음악이 존재하지 않습니다.")
            return
            
        ctx.voice_client.resume()
        await ctx.send("음악이 다시 재생됩니다.")

    @commands.command(aliases=["q","플레이리스트","대기열"])
    async def queue(self, ctx):
        """현재 대기열을 보여줍니다"""
        if len(self.queue) == 0:
            await ctx.send("대기열이 비어있습니다.")
            return
            
        queue_list = f"현재 대기열 ({len(self.queue)}/{self.max_queue_size}곡):\n"
        for i, (_, title) in enumerate(self.queue, 1):
            queue_list += f"{i}. {title}\n"
        await ctx.send(queue_list)

    @commands.command()
    async def now(self, ctx):
        """현재 재생중인 음악의 제목을 보여줍니다"""
        if not ctx.voice_client or not ctx.voice_client.is_playing():
            await ctx.send("현재 재생 중인 음악이 없습니다.")
            return
            
        if self.current_player:
            await ctx.send(f"현재 재생 중: {self.current_player.title}")
        else:
            await ctx.send("현재 재생 중인 음악 정보를 가져올 수 없습니다.")

    @commands.command(aliases=["삭제", "제거"])
    async def remove(self, ctx, index: int):
        """대기열에서 특정 곡을 삭제합니다"""
        if len(self.queue) == 0:
            await ctx.send("대기열이 비어있습니다.")
            return
            
        if not 1 <= index <= len(self.queue):
            await ctx.send(f"올바른 번호를 입력해주세요. (1 ~ 대기열 길이`({len(self.queue)})`)")
            return
            
        _, removed_title = self.queue.pop(index-1)
        await ctx.send(f"대기열에서 {index}번 곡 '{removed_title}'이(가) 제거되었습니다.")
    
    @commands.command(aliases=['반복'])
    async def loop(self, ctx):
        """전체 곡을 반복 재생합니다."""
        if not ctx.voice_client:
            await ctx.send("음성 채널에 연결되어 있지 않습니다.")
            return
        
        self.loop = not self.loop
        self.loop_current = False # 한곡 반복과 전체 반복 구분

        if self.loop:
            await ctx.send("전체 곡 반복이 활성화되었습니다.")
        else:
            await ctx.send("전체 곡 반복이 비활성화되었습니다.")

    @commands.command(aliases=['한곡반복','현재곡반복'])
    async def loop_one(self, ctx):
        """현재 재생 중인 곡을 반복 재생합니다."""
        if not ctx.voice_client or not ctx.voice_client.is_playing():
            await ctx.send("현재 재생 중인 곡이 없거나 음성 채널에 연결되어 있지 않습니다.")
            return
        
        self.loop_current = not self.loop_current
        self.loop = False # 한곡 반복과 전체 반복 구분

        if self.loop_current:
            await ctx.send("현재 곡 반복이 활성화되었습니다.")
        else:
            await ctx.send("현재 곡 반복이 비활성화되었습니다.")

    @play.before_invoke
    @skip.before_invoke
    async def ensure_voice(self, ctx):
        if ctx.voice_client is None:
            if ctx.author.voice:
                await ctx.author.voice.channel.connect()
            else:
                await ctx.send("음성 채널에 먼저 입장해주세요.")
                raise commands.CommandError("사용자가 음성 채널에 연결되어 있지 않습니다.")

 

전체 동작 코드에 여기서 작성한 Music 클래스를 붙여넣어 사용하면 된다.

 

동작 방식은 기존 작성했던 명령어와 동일하다.

 

명령어: !play '유튜브 링크' 

ex) !play https://www.youtube.com/~

 


 

오류 Q&A

** 해당 코드에서 사용한 Python 버전은 3.12.8 입니다.

Q: 봇을 실행시키고 명령어는 동작하는데, 봇이 아무 반응이 없어요(소리가 나오지 않아요)

A: 
1. 디스코드 개발자 포탈의 OAuth2 설정에서 bot의 권한이 Admin이거나, bot 권한 중 음성과 text 권한을 허용해주셔야 합니다.
2. 디스코드 개발자 포탈의 Privileged Gateway Intents 설정의 3가지 권한이 모두 허용되어 있어야 합니다.
3. 디스코드의 사용자 설정 - 음성 및 오디오 - 출력 장치의 설정이 사용자가 설정한 스피커로 설정되어 있는지 확인해주셔야 합니다.
4. IDE(VSCode) 터미널에서 오류 메시지가 출력되지는 않았는지 확인하셔야 합니다.

5. 디스코드 봇이 들어오고 초록색으로 표시되는 경우, !volume 50 명령어를 입력하셔서 볼륨 설정이 잘못되지 않았는지 확인하셔야 합니다.

 

위 방법 모두 동작하지 않는다면 python 버전을 최신(3.12) 버전으로 재설치 후 라이브러리를 pip install 해주시기 바랍니다.

pip install discord[voice]
pip install discord.py
pip install yt-dlp
pip install PyNaCl

 

파이썬 재설치 후 ffmpeg를 파이썬이 설치된 경로에 넣어주시기 바랍니다.

 

참조: VSCode에서 파이썬 버전 설정 방법

1. VSCode 우측 하단 파이썬 버전 클릭

 

2. 파이썬 버전을 3.12로 설정

 

Python 3.9 버전은 실행 불가 확인. 최신 버전의 파이썬 사용을 권장합니다. 

 

 

Q: TypeError가 발생해요.

A: 오류 메시지가 출력되는 부분에서 잘못 작성한 코드 또는 전체 코드를 빠뜨린 부분이 없는지 확인 부탁드립니다.

 

Q: 음악 재생이 원활하지 않거나 재생하는 데 시간이 걸려요.

A: 노트북과 같은 무선 네트워크 환경 등 네트워크가 원활하지 않은 환경에서 코드 실행 시 음악의 앞 부분이 잘려서 출력되는 현상 또는 음악이 밀리는 현상이 발생할 수 있습니다. 네트워크 환경이 원활한 상태에서 코드를 실행하시길 권장드립니다.

 

Q: dico_token이라는 파일이 없다고 떠요

A: 

1. dico_token.py라는 이름으로 파일을 만들어줍니다.(파일 이름은 사용자 마음대로 지정하셔도 됩니다.)

 

2. 디스코드 개발자 포털 - Settings - Bot - Reset Token 버튼을 클릭하여 토큰을 발급받아 복사합니다.

 

3. 발급받아 복사한 토큰을 아까 만들어 두었던 파일에 붙여넣기 합니다.

 

 

기타 오류 사항은 아래 페이지를 확인해주시기 바랍니다.

 

https://kante-kante.tistory.com/38

 

 

 

반응형