본문 바로가기
Discord/Discord Bot Python

[Discord Bot Python]디스코드 유튜브 음악 재생 봇 - 유튜브 음악 검색 기능

by 깐테 2025. 3. 18.

2023.05.18 - [Discord/Discord Bot Python] - Discord Bot 만들기 - 유튜브 음악 재생 봇

 

Discord Bot 만들기 - 유튜브 음악 재생 봇

Discord Bot 만들기 - 음성 채널 입장하기 Discord Bot 만들기 - 음성 채널 입장하기디스코드 봇을 사용해본 경험이 있다면, 유튜브 혹은 사운드 클라우드 등의 앱에서 음악을 재생해주는 봇을 사용해

kante-kante.tistory.com

해당 포스트의 유튜브 음악 재생 봇 기본 코드는 위 페이지를 참조해주시기 바랍니다.

 

2024.06.04 - [Discord/Discord Bot Python] - 디스코드 봇 - 유튜브 음악 재생 봇 설정 및 실행 방법

 

디스코드 봇 - 유튜브 음악 재생 봇 설정 및 실행 방법

2023.05.16 - [Discord/Discord Bot Python] - Discord Bot 만들기 - 음성 채널 입장하기 Discord Bot 만들기 - 음성 채널 입장하기디스코드 봇을 사용해본 경험이 있다면, 유튜브 혹은 사운드 클라우드 등의 앱에서

kante-kante.tistory.com

디스코드 봇 관련 설정은 위 페이지를 참조해주시기 바랍니다.


 

기존 플레이리스트, 루프 기능에 이어 디스코드 봇으로 유튜브 음악 검색 기능에 대해 설명해보려 한다.

 

이번 포스트에서는 yt-dlp 라이브러리를 이용해 디스코드 봇에서 유튜브 음악을 검색하면 검색 결과를 출력하고, 사용자가 선택한 음악을 재생하는 것 까지 지원하는 간단한 코드를 작성한다.

 

 

설치에 필요한 라이브러리

import discord, asyncio
from discord.ext import commands
from dico_token import Token
import yt_dlp as youtube_dl

 

사용된 라이브러리는 기존 코드와 동일하다.

 

 

코드 설명

1. 유튜브 검색

async def search_youtube(self, query):
        ytdl_opts = {
            'format': 'bestaudio/best',
            'quiet': True,
            'no_warnings': True,
            'default_search': 'ytsearch5',
            'extract_flat': True,
            'skip_download': True,
            'force_generic_extractor': True,
        }

        with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
            try:
                result = ytdl.extract_info(f"ytsearch5:{query}", download= False)
                if 'entries' in result:
                    # list comprehension
                    return [
                        (
                            f"https://www.youtube.com/watch?v={entry['id']}",
                            entry.get('title', 'N/A'),
                            f"{int(entry.get('duration', 0)) // 60}:{int(entry.get('duration', 0)) % 60:02d}" if entry.get('duration') else 'N/A'
                        )
                        for entry in result['entries']
                    ]
                return None
            except Exception as e:
                print(f"Search Error: {e}")
                return None

 

search_youtube 라는 이름의 함수를 만들어 주었다.

 

ytdl_opts = {
            'format': 'bestaudio/best',
            'quiet': True,
            'no_warnings': True,
            'default_search': 'ytsearch5',
            'extract_flat': True,
            'skip_download': True,
            'force_generic_extractor': True,
        }

 

ytdl_opts는 유튜브에서 음악을 가져올 설정을 지정하는 변수이므로 해당 옵션은 사용자에 맞게 설정하면 된다.

 

default_search 부분의 ytsearch "5" 로 설정해두었는데, 이는 유튜브에서 음악을 검색할 때, 기본적으로 몇개의 음악 목록을 가져와야 할 지 선택하는 부분이다. 해당 설정 부분의 숫자를 바꿔주면 원하는 만큼 목록 개수를 가져올 수 있으므로 설정에 참고한다.

 

with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
            try:
                result = ytdl.extract_info(f"ytsearch5:{query}", download= False)
                if 'entries' in result:
                    # list comprehension
                    return [
                        (
                            f"https://www.youtube.com/watch?v={entry['id']}",
                            entry.get('title', 'N/A'),
                            f"{int(entry.get('duration', 0)) // 60}:{int(entry.get('duration', 0)) % 60:02d}" if entry.get('duration') else 'N/A'
                        )
                        for entry in result['entries']
                    ]
                return None
            except Exception as e:
                print(f"Search Error: {e}")
                return None

 

 

해당 곡 정보에서 title(제목), duration(재생 시간) 정보를 추출하여 가져온다.

위의 ytdl_opts에서 extract_flat으로 정보를 가져왔다. 해당 방법은 duration을 변환해주지 않으면 '분'과 '초'로 나뉘는게 아니라 '초' 정보만 출력되기 때문에 변환이 필요하다. 재생 시간을 가져오면서 '초'와 '분'으로 나누도록 처리했다.

 

 

2. 디스코드에서 유튜브 검색 커맨드

@commands.command(aliases=['검색'])
    async def search(self, ctx, *, query):
        """유튜브에서 음악을 검색하여 리스트를 출력합니다."""
        try:
            if not ctx.author.voice:
                await ctx.send("음성 채널에 먼저 입장해주세요.")
                return
            
            searching_msg = await ctx.send("검색 중...")

            results = await self.search_youtube(query)
            if not results:
                await searching_msg.delete()
                await ctx.send("검색 결과를 찾을 수 없습니다.")
                return
                
            #검색 결과 표시
            embed = discord.Embed(
                title = "**검색 결과**",
                description = f"검색: {query} \n\n원하는 곡의 번호를 입력해주세요(1~5).\n'취소' 입력 시 검색을 취소합니다.",
                color = 0x3498db
            )
    
            for i, (_,title,duration) in enumerate(results, 1):
                embed.add_field(
                    name=f"{i}. ",
                    value=f"{title}\n재생 시간: {duration}",
                    inline = False
                )
            embed.set_footer(text="30초 내에 선택하지 않으면 자동 취소됩니다.")

            await searching_msg.delete()
            search_msg = await ctx.send(embed=embed)

            def check(m):
                return m.author == ctx.author and m.channel == ctx.channel and \
                       (m.content.isdigit() or m.content.lower() == "취소")

            try:
                msg = await self.bot.wait_for('message', timeout=30.0, check=check)
            except asyncio.TimeoutError:
                await search_msg.delete()
                await ctx.send("시간이 초과되었습니다. 다시 검색해주세요.")
                return

            if msg.content.lower() == "취소":
                await search_msg.delete()
                await ctx.send("검색이 취소되었습니다.")
                return

            choice = int(msg.content)
            if not 1 <= choice <= len(results):
                await search_msg.delete()
                await ctx.send("올바른 번호를 입력해주세요.")
                return
            
            await search_msg.delete()

            selected_url = results[choice-1][0]
            selectd_title = results[choice-1][1]
            
            # 선택된 곡 재생
            if len(self.queue) >= self.max_queue_size:
                await ctx.send(f"대기열이 가득 찼습니다. 최대 {self.max_queue_size}곡까지만 추가할 수 있습니다.")
                return

            async with ctx.typing():
                player = await YTDLSource.from_url(selected_url, loop=self.bot.loop, stream=True)

                if not ctx.voice_client:
                    await ctx.author.voice.channel.connect()

                if not ctx.voice_client.is_playing():
                    self.current_player = player
                    ctx.voice_client.play(self.current_player, 
                                        after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
                    await ctx.send(f'지금 재생 중: {selectd_title}')
                else:
                    self.queue.append((selected_url, selectd_title))
                    queue_info = f'대기열에 "{selectd_title}" 노래가 추가되었습니다.\n현재 대기열 ({len(self.queue)}곡):\n'
                    for i, (_, title) in enumerate(self.queue, 1):
                        queue_info += f"{i}. {title}\n"
                    await ctx.send(queue_info)

        except Exception as e:
            if 'search_msg' in locals():
                await search_msg.delete()
            await ctx.send(f"검색 중 오류가 발생했습니다: {str(e)}")
            print(f"검색 오류: {e}")

 

코드가 조금 긴데, 핵심 부분만 살펴본다.

 

 

results = await self.search_youtube(query)
if not results:
    await searching_msg.delete()
    await ctx.send("검색 결과를 찾을 수 없습니다.")
    return

#검색 결과 표시
embed = discord.Embed(
    title = "**검색 결과**",
    description = f"검색: {query} \n\n원하는 곡의 번호를 입력해주세요(1~5).\n'취소' 입력 시 검색을 취소합니다.",
    color = 0x3498db
)

for i, (_,title,duration) in enumerate(results, 1):
    embed.add_field(
        name=f"{i}. ",
        value=f"{title}\n재생 시간: {duration}",
        inline = False
    )
embed.set_footer(text="30초 내에 선택하지 않으면 자동 취소됩니다.")

await searching_msg.delete()
search_msg = await ctx.send(embed=embed)

 

위에서 작성했던 search_youtube 함수를 통해 음악을 검색한다.

 

이후 embed로 검색한 곡 목록을 5개까지 표시한다. enumerate 함수를 사용하면 list와 같은 자료형을 입력받았을 때, 인덱스와 값을 같이 리턴한다. 이후 사용자가 원하는 곡의 번호를 디스코드 메시지로 입력하면 음악을 재생할 수 있다.

 

 

def check(m):
    return m.author == ctx.author and m.channel == ctx.channel and \
           (m.content.isdigit() or m.content.lower() == "취소")

try:
    msg = await self.bot.wait_for('message', timeout=30.0, check=check)
except asyncio.TimeoutError:
    await search_msg.delete()
    await ctx.send("시간이 초과되었습니다. 다시 검색해주세요.")
    return

if msg.content.lower() == "취소":
    await search_msg.delete()
    await ctx.send("검색이 취소되었습니다.")
    return

choice = int(msg.content)
if not 1 <= choice <= len(results):
    await search_msg.delete()
    await ctx.send("올바른 번호를 입력해주세요.")
    return

await search_msg.delete()

 

해당 부분의 코드는 사용자가 검색에 사용한 인덱스가 올바른 번호인지, 또는 곡을 검색하는 도중 취소했거나 시간이 초과되었는지 확인하는 부분의 코드이다.

 

기본적으로 timeout을 30초로 지정했으며 해당 시간동안 아무런 입력이 존재하지 않으면 검색을 취소한다. 사용자가 '취소' 메시지를 디스코드 메시지로 입력해도 검색을 취소할 수 있다.

 

 

selected_url = results[choice-1][0]
selectd_title = results[choice-1][1]

# 선택된 곡 재생
if len(self.queue) >= self.max_queue_size:
    await ctx.send(f"대기열이 가득 찼습니다. 최대 {self.max_queue_size}곡까지만 추가할 수 있습니다.")
    return

async with ctx.typing():
    player = await YTDLSource.from_url(selected_url, loop=self.bot.loop, stream=True)

    if not ctx.voice_client:
        await ctx.author.voice.channel.connect()

    if not ctx.voice_client.is_playing():
        self.current_player = player
        ctx.voice_client.play(self.current_player, 
                            after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
        await ctx.send(f'지금 재생 중: {selectd_title}')
    else:
        self.queue.append((selected_url, selectd_title))
        queue_info = f'대기열에 "{selectd_title}" 노래가 추가되었습니다.\n현재 대기열 ({len(self.queue)}곡):\n'
        for i, (_, title) in enumerate(self.queue, 1):
            queue_info += f"{i}. {title}\n"
        await ctx.send(queue_info)

 

사용자가 선택한 번호의 곡에서 url과 title정보를 변수로 저장한다.

이후 기존 재생 코드와 동일하게 곡을 바로 재생하거나 재생중인 곡이 있으면 대기열에 추가한다.

 

 

 

전체 코드

import asyncio
 
import discord
import yt_dlp as youtube_dl
 
from discord.ext import commands
from dico_token import Token
 
# Suppress noise about console usage from errors
# youtube_dl.utils.bug_reports_message = lambda: ''

# <lambda>() got ~ 오류 발생 시 아래 사용
youtube_dl.utils.bug_reports_message = lambda *args, **kwargs: ''

ytdl_format_options = {
    'format': 'bestaudio/best',
    'outtmpl': '%(extractor)s-%(id)s-%(title)s.%(ext)s',
    'restrictfilenames': True,
    'noplaylist': True,
    'nocheckcertificate': True,
    'ignoreerrors': False,
    'logtostderr': False,
    'quiet': True,
    'no_warnings': True,
    'default_search': 'auto',
    'source_address': '0.0.0.0',  # bind to ipv4 since ipv6 addresses cause issues sometimes
}
 
ffmpeg_options = {
    'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
    'options': '-vn',
}
 
ytdl = youtube_dl.YoutubeDL(ytdl_format_options)

# youtube 음악과 로컬 음악의 재생을 구별하기 위한 클래스 작성.
class YTDLSource(discord.PCMVolumeTransformer):
    def __init__(self, source, *, data, volume=0.5):
        super().__init__(source, volume)
 
        self.data = data
 
        self.title = data.get('title')
        self.url = data.get('url')
 
    @classmethod
    async def from_url(cls, url, *, loop=None, stream=False):
        loop = loop or asyncio.get_event_loop()
        data = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=not stream))
 
        if 'entries' in data:
            # take first item from a playlist
            data = data['entries'][0]
 
        filename = data['url'] if stream else ytdl.prepare_filename(data)
        return cls(discord.FFmpegPCMAudio(filename, **ffmpeg_options), data=data)
 
class Music(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.queue = []
        self.current_player = None
        self.max_queue_size = 10  # 대기열 최대 길이 설정
        self.loop = False
        self.loop_current = False
 
    async def play_next(self, ctx):
        if not ctx.voice_client:
            return
        
        try:
            if self.loop_current and self.current_player: 
                current_url = self.current_player.data['url']
                self.current_player = await YTDLSource.from_url(current_url, loop=self.bot.loop, stream=True)
                ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
                return
 
            if self.loop and self.current_player:
                current_url = self.current_player.data['url']
                current_title = self.current_player.title
                self.queue.append((current_url, current_title))
 
            if len(self.queue) > 0:
                # 다음 곡 재생
                next_url, next_title = self.queue.pop(0)
                self.current_player = await YTDLSource.from_url(next_url, loop=self.bot.loop, stream=True)
                ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
                
                # play_next에서 호출될 때는 메시지를 보내지 않음
                if ctx.command and ctx.command.name == "skip":
                    await ctx.send(f'지금 재생 중: {self.current_player.title}')
                    
        except Exception as e:
            await ctx.send(f"재생 중 오류가 발생했습니다: {str(e)}")
            print(f"재생 오류: {e}")
            # 오류 발생 시 대기열에 기존 재생 곡 추가
            if current_url and current_title:
                self.queue.insert(0, (current_url, current_title))
 
    async def search_youtube(self, query):
        ytdl_opts = {
            'format': 'bestaudio/best',
            'quiet': True,
            'no_warnings': True,
            'default_search': 'ytsearch5',
            'extract_flat': True,
            'skip_download': True,
            'force_generic_extractor': True,
        }
 
        with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
            try:
                result = ytdl.extract_info(f"ytsearch5:{query}", download= False)
                if 'entries' in result:
                    # list comprehension
                    return [
                        (
                            f"https://www.youtube.com/watch?v={entry['id']}",
                            entry.get('title', 'N/A'),
                            f"{int(entry.get('duration', 0)) // 60}:{int(entry.get('duration', 0)) % 60:02d}" if entry.get('duration') else 'N/A'
                        )
                        for entry in result['entries']
                    ]
                return None
            except Exception as e:
                print(f"Search Error: {e}")
                return None
 
    @commands.command(aliases=["입장"])
    async def join(self, ctx):
        """음성 채널에 입장합니다"""
        if not ctx.author.voice:
            await ctx.send("음성 채널에 먼저 입장해주세요.")
            return
            
        channel = ctx.author.voice.channel
        
        if ctx.voice_client is not None:
            return await ctx.voice_client.move_to(channel)
 
        await channel.connect()
        await ctx.send(f"{channel} 채널에 입장했습니다.")
    
    @commands.command(aliases=["다음"])
    async def skip(self, ctx):
        """대기열에서 다음 곡을 재생합니다"""
        if not ctx.voice_client:
            await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
            return
            
        if not self.queue:  # 대기열이 비어있는지 확인
            await ctx.send("다음 재생할 곡이 대기열에 없습니다.\n음악을 계속 재생하시려면 음악을 추가해주세요.")
            return
            
        # 반복 모드에서 스킵 제한.
        if self.loop or self.loop_current:
            await ctx.send("반복 모드를 종료하고 skip 명령어를 입력해주세요.")
            return
 
        try:
            # 현재 재생중인 음악 중지
            if ctx.voice_client.is_playing():
                ctx.voice_client.stop()
                
            next_url, next_title = self.queue.pop(0)
            self.current_player = await YTDLSource.from_url(next_url, loop=self.bot.loop, stream=True)
 
            ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
            await ctx.send(f'지금 재생 중: {self.current_player.title}')
        except Exception as e:
            await ctx.send(f"재생 중 오류가 발생했습니다: {str(e)}")
            print(f"재생 오류: {e}")
            # 오류 발생 시 대기열에 기존 재생 곡 추가
            if next_url and next_title:
                self.queue.insert(0, (next_url, next_title))
   
    @commands.command(aliases=["재생"])
    async def play(self, ctx, *, query):
        """URL에서 음악을 재생하고 대기열에 추가합니다"""
        
        try:
            if not ctx.author.voice:
                await ctx.send("음성 채널에 먼저 입장해주세요.")
                return
 
            if not ctx.voice_client:
                await ctx.author.voice.channel.connect()
                
            if len(self.queue) >= self.max_queue_size:
                await ctx.send(f"대기열이 가득 찼습니다. 최대 {self.max_queue_size}곡까지만 추가할 수 있습니다.")
                return
            
            #youtube 검색
            if not query.startswith(('http://', 'https://')):
                async with ctx.typing():
                    url = await self.search_youtube(query)
                    if not url:
                        await ctx.send("검색 결과를 찾을 수 없습니다.")
                        return
            else:
                url = query            
 
            # 추가하려는 곡의 정보를 미리 가져옴
            async with ctx.typing():
                player = await YTDLSource.from_url(url, loop=self.bot.loop, stream=True)
 
                if not ctx.voice_client.is_playing():
                    self.current_player = player
                    ctx.voice_client.play(self.current_player, after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
                    await ctx.send(f'지금 재생 중: {self.current_player.title}')
                else:
                    self.queue.append((url, player.title))
                    queue_info = f'대기열에 "{player.title}" 노래가 추가되었습니다.\n현재 대기열 ({len(self.queue)}곡):\n'
                    for i, (_, title) in enumerate(self.queue, 1):
                        queue_info += f"{i}. {title}\n"
                    await ctx.send(queue_info)
                    
        except Exception as e:
            await ctx.send(f"음악을 추가하는 중 오류가 발생했습니다: {str(e)}")
            print(f"재생 오류: {e}")
 
    @commands.command(aliases=["음량","볼륨","소리"])
    async def volume(self, ctx, volume: int):
        """플레이어의 볼륨을 조절합니다"""
 
        if ctx.voice_client is None:
            return await ctx.send("음성 채널에 연결되어 있지 않습니다.")
 
        if not 0 <= volume <= 100:
            return await ctx.send("볼륨은 0에서 100 사이의 값이어야 합니다.")
 
        ctx.voice_client.source.volume = volume / 100
        await ctx.send(f"볼륨이 {volume}%로 변경되었습니다")
 
    @commands.command()
    async def stop(self, ctx):
        """재생을 멈추고 음성 채널에서 나갑니다"""
        if not ctx.voice_client:
            await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
            return
            
        self.queue.clear()
        self.loop_current = False
        self.loop = False
        self. current_player = None
 
        if ctx.voice_client.is_playing():
            ctx.voice_client.stop()
        await ctx.voice_client.disconnect()
        await ctx.send("재생을 멈추고 채널에서 나갔습니다.")
        
    @commands.command()
    async def pause(self, ctx):
        """음악을 일시정지합니다"""
        if not ctx.voice_client:
            await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
            return
 
        if ctx.voice_client.is_paused() or not ctx.voice_client.is_playing():
            await ctx.send("음악이 이미 일시 정지 중이거나 재생 중이지 않습니다.")
            return
            
        ctx.voice_client.pause()
        await ctx.send("음악이 일시정지되었습니다.")
            
    @commands.command()
    async def resume(self, ctx):
        """일시정지된 음악을 다시 재생합니다"""
        if not ctx.voice_client:
            await ctx.send("봇이 음성 채널에 연결되어 있지 않습니다.")
            return
 
        if ctx.voice_client.is_playing() or not ctx.voice_client.is_paused():
            await ctx.send("음악이 이미 재생 중이거나 재생할 음악이 존재하지 않습니다.")
            return
            
        ctx.voice_client.resume()
        await ctx.send("음악이 다시 재생됩니다.")
 
    @commands.command(aliases=["q","플레이리스트","대기열"])
    async def queue(self, ctx):
        """현재 대기열을 보여줍니다"""
        if len(self.queue) == 0:
            await ctx.send("대기열이 비어있습니다.")
            return
            
        queue_list = f"현재 대기열 ({len(self.queue)}/{self.max_queue_size}곡):\n"
        for i, (_, title) in enumerate(self.queue, 1):
            queue_list += f"{i}. {title}\n"
        await ctx.send(queue_list)
 
    @commands.command()
    async def now(self, ctx):
        """현재 재생중인 음악의 제목을 보여줍니다"""
        if not ctx.voice_client or not ctx.voice_client.is_playing():
            await ctx.send("현재 재생 중인 음악이 없습니다.")
            return
            
        if self.current_player:
            await ctx.send(f"현재 재생 중: {self.current_player.title}")
        else:
            await ctx.send("현재 재생 중인 음악 정보를 가져올 수 없습니다.")
 
    @commands.command(aliases=["삭제", "제거"])
    async def remove(self, ctx, index: int):
        """대기열에서 특정 곡을 삭제합니다"""
        if len(self.queue) == 0:
            await ctx.send("대기열이 비어있습니다.")
            return
            
        if not 1 <= index <= len(self.queue):
            await ctx.send(f"올바른 번호를 입력해주세요. (1 ~ 대기열 길이`({len(self.queue)})`)")
            return
            
        _, removed_title = self.queue.pop(index-1)
        await ctx.send(f"대기열에서 {index}번 곡 '{removed_title}'이(가) 제거되었습니다.")
    
    @commands.command(aliases=['반복'])
    async def loop(self, ctx):
        """전체 곡을 반복 재생합니다."""
        if not ctx.voice_client:
            await ctx.send("음성 채널에 연결되어 있지 않습니다.")
            return
        
        self.loop = not self.loop
        self.loop_current = False # 한곡 반복과 전체 반복 구분
 
        if self.loop:
            await ctx.send("전체 곡 반복이 활성화되었습니다.")
        else:
            await ctx.send("전체 곡 반복이 비활성화되었습니다.")
 
    @commands.command(aliases=['한곡반복','현재곡반복'])
    async def loop_one(self, ctx):
        """현재 재생 중인 곡을 반복 재생합니다."""
        if not ctx.voice_client or not ctx.voice_client.is_playing():
            await ctx.send("현재 재생 중인 곡이 없거나 음성 채널에 연결되어 있지 않습니다.")
            return
        
        self.loop_current = not self.loop_current
        self.loop = False # 한곡 반복과 전체 반복 구분
 
        if self.loop_current:
            await ctx.send("현재 곡 반복이 활성화되었습니다.")
        else:
            await ctx.send("현재 곡 반복이 비활성화되었습니다.")
 
    @commands.command(aliases=['검색'])
    async def search(self, ctx, *, query):
        """유튜브에서 음악을 검색하여 리스트를 출력합니다."""
        try:
            if not ctx.author.voice:
                await ctx.send("음성 채널에 먼저 입장해주세요.")
                return
            
            searching_msg = await ctx.send("검색 중...")
 
            results = await self.search_youtube(query)
            if not results:
                await searching_msg.delete()
                await ctx.send("검색 결과를 찾을 수 없습니다.")
                return
                
            #검색 결과 표시
            embed = discord.Embed(
                title = "**검색 결과**",
                description = f"검색: {query} \n\n원하는 곡의 번호를 입력해주세요(1~5).\n'취소' 입력 시 검색을 취소합니다.",
                color = 0x3498db
            )
    
            for i, (_,title,duration) in enumerate(results, 1):
                embed.add_field(
                    name=f"{i}. ",
                    value=f"{title}\n재생 시간: {duration}",
                    inline = False
                )
            embed.set_footer(text="30초 내에 선택하지 않으면 자동 취소됩니다.")
 
            await searching_msg.delete()
            search_msg = await ctx.send(embed=embed)
 
            def check(m):
                return m.author == ctx.author and m.channel == ctx.channel and \
                       (m.content.isdigit() or m.content.lower() == "취소")
 
            try:
                msg = await self.bot.wait_for('message', timeout=30.0, check=check)
            except asyncio.TimeoutError:
                await search_msg.delete()
                await ctx.send("시간이 초과되었습니다. 다시 검색해주세요.")
                return
 
            if msg.content.lower() == "취소":
                await search_msg.delete()
                await ctx.send("검색이 취소되었습니다.")
                return
 
            choice = int(msg.content)
            if not 1 <= choice <= len(results):
                await search_msg.delete()
                await ctx.send("올바른 번호를 입력해주세요.")
                return
            
            await search_msg.delete()
 
            selected_url = results[choice-1][0]
            selectd_title = results[choice-1][1]
            
            # 선택된 곡 재생
            if len(self.queue) >= self.max_queue_size:
                await ctx.send(f"대기열이 가득 찼습니다. 최대 {self.max_queue_size}곡까지만 추가할 수 있습니다.")
                return
 
            async with ctx.typing():
                player = await YTDLSource.from_url(selected_url, loop=self.bot.loop, stream=True)
 
                if not ctx.voice_client:
                    await ctx.author.voice.channel.connect()
 
                if not ctx.voice_client.is_playing():
                    self.current_player = player
                    ctx.voice_client.play(self.current_player, 
                                        after=lambda _: asyncio.run_coroutine_threadsafe(self.play_next(ctx), self.bot.loop))
                    await ctx.send(f'지금 재생 중: {selectd_title}')
                else:
                    self.queue.append((selected_url, selectd_title))
                    queue_info = f'대기열에 "{selectd_title}" 노래가 추가되었습니다.\n현재 대기열 ({len(self.queue)}곡):\n'
                    for i, (_, title) in enumerate(self.queue, 1):
                        queue_info += f"{i}. {title}\n"
                    await ctx.send(queue_info)
 
        except Exception as e:
            if 'search_msg' in locals():
                await search_msg.delete()
            await ctx.send(f"검색 중 오류가 발생했습니다: {str(e)}")
            print(f"검색 오류: {e}")
 
 
    @play.before_invoke
    @skip.before_invoke
    async def ensure_voice(self, ctx):
        if ctx.voice_client is None:
            if ctx.author.voice:
                await ctx.author.voice.channel.connect()
            else:
                await ctx.send("음성 채널에 먼저 입장해주세요.")
                raise commands.CommandError("사용자가 음성 채널에 연결되어 있지 않습니다.")
 
 
intents = discord.Intents.default()
intents.message_content = True
 
bot = commands.Bot(
    command_prefix=commands.when_mentioned_or("!"),
    description='Relatively simple music bot example',
    intents=intents,
)
 
 
@bot.event
async def on_ready():
    print(f'Logged in as {bot.user} (ID: {bot.user.id})')
    print('------')
 
 
async def main():
    async with bot:
        await bot.add_cog(Music(bot))
        await bot.start(Token)
 
 
asyncio.run(main())

 

유튜브 검색 기능 실행 명령어:

!검색, !search

ex)

!검색 '음악제목'

!검색 '아티스트 이름'

 

 

 

해당 코드는 loop 기능과 유튜브 검색, 플레이리스트 기능을 모두 한 파일로 통합해둔 코드.

 

기존 작성해둔 유튜브 음악 재생 봇 코드에 클래스 부분만 복사해서 붙여넣고 사용하면 된다. 루프 기능까지 작성해둔 코드가 존재한다면  search_youtube 부분과 커맨드의 search 부분만 복사하여 사용하면 된다.

 

 

실행 결과

 

! 검색 아티스트 명을 입력하면 연관된 5개의 음악 목록을 출력한다.

 

30초의 시간이 지나면 시간 초과 메시지를 출력하고 embed 메시지를 제거한다.

 

취소 메시지를 입력하면 검색을 취소하고 embed 메시지를 제거한다.

 


오류 Q&A

** 해당 코드에서 사용한 Python 버전은 3.12.8 입니다.

Q: 봇을 실행시키고 명령어는 동작하는데, 봇이 아무 반응이 없어요(소리가 나오지 않아요)

A: 
1. 디스코드 개발자 포탈의 OAuth2 설정에서 bot의 권한이 Admin이거나, bot 권한 중 음성과 text 권한을 허용해주셔야 합니다.
2. 디스코드 개발자 포탈의 Privileged Gateway Intents 설정의 3가지 권한이 모두 허용되어 있어야 합니다.
3. 디스코드의 사용자 설정 - 음성 및 오디오 - 출력 장치의 설정이 사용자가 설정한 스피커로 설정되어 있는지 확인해주셔야 합니다.
4. IDE(VSCode) 터미널에서 오류 메시지가 출력되지는 않았는지 확인하셔야 합니다.

5. 디스코드 봇이 들어오고 초록색으로 표시되는 경우, !volume 50 명령어를 입력하셔서 볼륨 설정이 잘못되지 않았는지 확인하셔야 합니다.

 

위 방법 모두 동작하지 않는다면 python 버전을 최신(3.12) 버전으로 재설치 후 라이브러리를 pip install 해주시기 바랍니다.

pip install discord[voice]
pip install discord.py
pip install yt-dlp
pip install PyNaCl

 

파이썬 재설치 후 ffmpeg를 파이썬이 설치된 경로에 넣어주시기 바랍니다.

 

참조: VSCode에서 파이썬 버전 설정 방법

1. VSCode 우측 하단 파이썬 버전 클릭

 

2. 파이썬 버전을 3.12로 설정

 

Python 3.9 버전은 실행 불가 확인. 최신 버전의 파이썬 사용을 권장합니다. 

 

 

Q: TypeError가 발생해요.

A: 오류 메시지가 출력되는 부분에서 잘못 작성한 코드 또는 전체 코드를 빠뜨린 부분이 없는지 확인 부탁드립니다.

 

Q: 음악 재생이 원활하지 않거나 재생하는 데 시간이 걸려요.

A: 노트북과 같은 무선 네트워크 환경 등 네트워크가 원활하지 않은 환경에서 코드 실행 시 음악의 앞 부분이 잘려서 출력되는 현상 또는 음악이 밀리는 현상이 발생할 수 있습니다. 네트워크 환경이 원활한 상태에서 코드를 실행하시길 권장드립니다.

또는 ffmpeg_options를 아래와 같이 설정해보시기 바랍니다.

FFMPEG_OPTIONS = {
    'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5 -nostdin',
    'options':'-vn -preset ultrafast -ar 48000 -ac 2 -b:a 192k'
}

 

Q: dico_token이라는 파일이 없다고 떠요

A: 

1. dico_token.py라는 이름으로 파일을 만들어줍니다.(파일 이름은 사용자 마음대로 지정하셔도 됩니다.)

 

2. 디스코드 개발자 포털 - Settings - Bot - Reset Token 버튼을 클릭하여 토큰을 발급받아 복사합니다.

 

3. 발급받아 복사한 토큰을 아까 만들어 두었던 파일에 붙여넣기 합니다.

 

 

기타 오류 사항은 아래 페이지를 확인해주시기 바랍니다.

 

https://kante-kante.tistory.com/38

반응형