728x90
반응형

Code Flow

  위 흐름도와 같이 1. 전체 물건 조회 -> 2. 조건에 따른 선별 -> 3. 이미지 저장 및 ppt 저장 순으로 코드 진행된다. 이미지로 저장되는 매각물건명세서 특성 상, 대항력 임차인은 직접 확인 필요하다. (이미지 저장까지만 자동화)

main

import pandas as pd
import numpy as np
from tqdm import tqdm
import re, time, datetime, random

from haversine import haversine
from selenium.webdriver.common.by import By

from module.naver import naverAPIModule
from module.scraper import customScraper
from module.publicData import publicData
from module.visualization import customPPTX, saveTransactionChart

# custom 모듈 가져오기
naverAPI = naverAPIModule()
scrapper = customScraper()
publicInfo = publicData()
prs = customPPTX('template.pptx')

# 확인할 날짜 선택
today = datetime.datetime.now().strftime('%Y_%m_%d')
fromdate = (datetime.datetime.now() + datetime.timedelta(days=0)).strftime('%Y.%m.%d')
enddate = (datetime.datetime.now() + datetime.timedelta(days=7)).strftime('%Y.%m.%d')
print(today, fromdate, enddate)

########################################
# 전체 경매 물건 list 가져오기
########################################
scrapper.startDriver('https://www.courtauction.go.kr/RetrieveMainInfo.laf')
aution_item = scrapper.getAutionitemList(fromdate, enddate)
scrapper.endDriver()
print('전체 물건 개수 :', len(aution_item))

########################################
# 저장해둔 지하철 정보 불러오기
########################################
subway_info = pd.read_csv('subway_info.csv', encoding='cp949')
subway_name = list(subway_info['역사명'])
subway_corr = subway_info[['위도','경도']].to_numpy(dtype=float)

aution_item[['경도','위도','지하철역','거리']] = None
for idx, row in aution_item.iterrows():
    try: # 좌표 검색
        data = naverAPI.get_loc(row['소재지'])
        aution_item.loc[idx,'경도'] = data['addresses'][0]['x']
        aution_item.loc[idx,'위도'] = data['addresses'][0]['y']
    except: pass # 좌표 검색이 안되는 경우는 공란으로 두고 넘어감

for idx, a in enumerate(aution_item[['위도','경도']].to_numpy(dtype=float)):
    everyDistance = np.apply_along_axis(lambda x : haversine(x,a,unit='m'), axis=1, arr=subway_corr)
    aution_item.loc[idx,'지하철역'] = subway_name[np.argmin(everyDistance)]
    aution_item.loc[idx,'거리'] = int(np.min(everyDistance))

print('좌표 검색 실패 물건 개수 : ', len(aution_item[aution_item['위도'].isna()]))
if len(aution_item[aution_item['위도'].isna()]) : display(aution_item[aution_item['위도'].isna()])
aution_item = aution_item[aution_item['거리'] <= 750].reset_index(drop=True)
print('거리 조건 제외 후 전체 물건 수 : ', len(aution_item))

########################################
# 면적 조건을 설정
########################################
for n, row in aution_item.iterrows():
    area = re.sub('[^0-9.]', " ", row['내역']) # 숫자와 '.'만 남기고 전부 제거
    area = re.sub(r" +", " ", area) # 연속 띄어스기는 하나로 통일
    area = area.split(" ") # 띄어쓰기로 문자열 나눔
    # 공란 혹은 '.' 만 있는 경우를 제외하고 전부 숫자로 바꾼 뒤 총합하여 총면적을 계산
    area = [float(a) for a in area if len(a) > 0 and "." in str(a) and len(a) != 1]
    aution_item.loc[n,'면적'] = round(sum(area),2)

aution_item = aution_item[aution_item['면적'] <= 120].reset_index(drop=True) # 큰 평수 제외
aution_item = aution_item[aution_item['면적'] >= 50].reset_index(drop=True) # 작은 평수 제외
print('면적 조건 제외 후 전체 물건 수 : ', len(aution_item))

########################################
# 지번주소, 구코드, 동코드 추가
########################################
# 지번 주소 가져오기
scrapper.startDriver('https://www.juso.go.kr/openIndexPage.do')
aution_item['지번주소'] = None
for idx, row in tqdm(aution_item.iterrows(), total = len(aution_item)):
    aution_item.loc[idx,'지번주소'] = scrapper.getchangeAddress(row['소재지'])
scrapper.endDriver()

# 저장해둔 코드 정보 불러오기
code_df = pd.read_csv('code.csv')
aution_item[['구코드','동코드']] = None
for idx, row in aution_item.iterrows():
    # 물건이 법정구에 해당하는 row만 가져옴
    gu_df = code_df[code_df['구'] == row['소재지'].split(' ')[1]]
    # 각 동 별로 물건이 있는 동인지 확인
    for _, gu_row in gu_df.iterrows():
        if gu_row['동'] in row['소재지']: # 해당 동일 경우 코드 저장 후 종료
            aution_item.loc[idx, '구코드'] = gu_row['구코드']
            aution_item.loc[idx, '동코드'] = gu_row['동코드']
            break
    else: # 동 정보가 없으면 지번 주소로 다시 찾음.
        for _, gu_row in gu_df.iterrows():
            if gu_row['동'] in row['지번주소']:
                aution_item.loc[idx, '구코드'] = gu_row['구코드']
                aution_item.loc[idx, '동코드'] = gu_row['동코드']
                break

print('동 정보가 없는 물건 : ', len(aution_item[aution_item['동코드'].isna()]))
if len(aution_item[aution_item['동코드'].isna()]) : display(aution_item[aution_item['동코드'].isna()])

########################################
# 건물정보 추가 후 조건에 맞춰 필터링
########################################

aution_item[['건폐율','용적률','사용승인일','세대수']] = None
fail_list = []
for idx, row in tqdm(aution_item.iterrows(), total = len(aution_item)):
    api_df = publicInfo.getBuildingLedger(row)

    # 최종적으로 표제부가 확인되면 정보 저장
    if len(api_df) > 0:
        api_df[['건축면적','용적률산정연면적','대지면적']] = api_df[['건축면적','용적률산정연면적','대지면적']].astype(float)
        api_df[['세대수']] = api_df[['세대수']].astype(int)

        # 대지면적이 0으로 정보가 없는 경우 건폐율, 용적률 정보는 제외
        if sum(api_df['대지면적']) == 0:
            aution_item.loc[idx,'건폐율'] = 9999.0
            aution_item.loc[idx,'용적률'] = 9999.0
        else:
            aution_item.loc[idx,'건폐율'] = round((sum(api_df['건축면적']) / sum(api_df['대지면적'])) * 100,2)
            aution_item.loc[idx,'용적률'] = round((sum(api_df['용적률산정연면적']) / sum(api_df['대지면적'])) * 100,2)
        aution_item.loc[idx,'세대수'] = int(sum(api_df['세대수']))
        aution_item.loc[idx,'사용승인일'] = api_df['사용승인일'][0]
    # 확인되지 않으면 제외
    else:
        fail_list.append((idx, row['사건번호'], row['소재지']))

[print('건물 정보 불러오기 실패 물건 :', *i) for i in fail_list]
aution_item = aution_item[~aution_item['사용승인일'].isna()]
aution_item = aution_item[aution_item['세대수'] >= 300].reset_index(drop=True)
aution_item['year'] = aution_item['사용승인일'].str.slice(start=0,stop=4)
aution_item['year'] = aution_item['year'].astype(int)
aution_item = aution_item[aution_item['year'] >= 1995].reset_index(drop=True)

print('최종 물건 수 :', len(aution_item))

########################################
# 거래 정보 가져오기
########################################
trade_data, loan_data = publicInfo.getTransaction(code_df, 'trade.csv', 'loan.csv')

########################################
# 물건별로 정보 상세 검색
########################################
scrapper.startDriver('https://www.courtauction.go.kr/RetrieveMainInfo.laf')
scrapper.driver.find_element(By.XPATH, '이전 버튼 XPATH').click()
time.sleep(2 * (random.random()+1))

for n, row in tqdm(aution_item.iterrows(), total = aution_item.shape[0]):
    # 물건별로 정보 상세 검색
    isImage = scrapper.getAutionDetail(row)
    # 네이버 지도 저장
    naverAPI.save_map(f'./temp/building_map.png', 13, row['경도'], row['위도'], int(8.83 * 37.79), int(6.87 * 37.79))
    # 매매, 전월세 실거래 차트 그리기
    isImage['isChart'] = saveTransactionChart(row, trade_data, loan_data)
    # 물건별 slide 완성          
    prs.setAutionSlide(row, isImage)

# 프레젠테이션 파일 저장
prs.save(f"./{today}.pptx")
scrapper.endDriver()

  Naver API, PublicDataReader, Scraping, Visualization 으로 모듈, 클래스를 나누어 main을 구현했다. 범용성이 낮은 함수들이 대부분이나 main이 너무 길어지는 것을 방지하기 위해 모듈로 구성했으며,물건 별로 신규 column을 계산하거나, 필터링을 하는 기능은 main에 구현했다.

728x90

Naver API Module

import requests, io
import PIL

class naverAPIModule():
    def __init__(self):
        self.client_id = '----' # 네이버 클라우드 시스템에서 확인 가능
        self.client_secret = '----' # 네이버 클라우드 시스템에서 확인 가능

    def get_loc(self, address):
        url = f'https://naveropenapi.apigw.ntruss.com/map-geocode/v2/geocode?query={address}'
        headers = {'X-NCP-APIGW-API-KEY-ID': self.client_id, 'X-NCP-APIGW-API-KEY': self.client_secret}
        r = requests.get(url, headers=headers)
        data = r.json()
        
        return data
    
    def save_map(self, directory, level, loc, lac, w, h):        
        # 네이버 지도 저장
        url = "https://naveropenapi.apigw.ntruss.com/map-static/v2/raster"
        headers = {'X-NCP-APIGW-API-KEY-ID': self.client_id, 'X-NCP-APIGW-API-KEY': self.client_secret}
        params = {
            'center' : f"{loc},{lac}", # 중심 좌표
            'level' : level,           # 줌 레벨 - 0 ~ 20
            'w' : w,                   # 가로 세로 크기 (픽셀)
            'h' : h,                   # 가로 세로 크기 (픽셀)
            'maptype' : "basic",       # 지도 유형 - basic, traffic, satellite, satellite_base, terrain
            'format' : "png8",         # 반환 이미지 형식 - jpg, jpeg, png8, png
            'scale' : 1,               # 고해상도 디스펠레이 지원을 위한 옵션 - 1, 2
            'markers' : f"""type:d|size:mid|pos:{loc} {lac}|color:Default""",  # 마커
            'lang' : "ko",             # 라벨 언어 설정 - ko, en, ja, zh
            'public_transit' : True,   # 대중교통 정보 노출 - Boolean
            'dataversion' : "",        # 서비스에서 사용할 데이터 버전 파라미터 전달 CDN 캐시 무효화
        }
        res = requests.get(url, params=params, headers=headers)

        image_data = io.BytesIO(res.content)
        image = PIL.Image.open(image_data)
        image.save(directory, quality=100)

        return None

  ID와 secert은 개인별 입력값을 넣어서 활용 가능하다. 다른 자동화 시스템을 만들때도 동일한 모듈을 활용할 예정이다.

PublicDataReader Module

import pandas as pd
import numpy as np

import datetime

from dateutil.relativedelta import relativedelta
from PublicDataReader import TransactionPrice, BuildingLedger

class publicData:
    def __init__(self):
        self.building = BuildingLedger("----")
        self.trans = TransactionPrice("----")

    def getBuildingLedger(self, row) -> pd.DataFrame:
        # '번'과 '지'를 나누어 확인.
        bunji = row['지번주소'].split(" ")[3]
        bun = bunji.split("-")[0]
        if "-" in bunji: ji = bunji.split("-")[1]
        else: ji = '0'

        # '번', '지'를 둘 다 인자로 전달 후 data 확인
        df = self.building.get_data(ledger_type ="표제부", sigungu_code = row['구코드'], bdong_code = row['동코드'], bun = bun, ji = ji)
        if len(df) == 0: # data가 전혀 없으면 '지'를 제외 후 검색
            df = self.building.get_data(ledger_type ="표제부", sigungu_code = row['구코드'], bdong_code = row['동코드'], bun = bun)
            # 건물명이 소재지에 있는지 확인
            df = df[[t['건물명'] in row['소재지'] for _,t in df.iterrows()]]

        return df
    
    def getTransaction(self, code_df, trade_directory, loan_directory) -> pd.DataFrame:
        # 월초를 감안하여 이전달, 이번달 다운로드
        lastmonth = (datetime.datetime.now() - relativedelta(months=1)).strftime('%Y%m')
        month = datetime.datetime.now().strftime('%Y%m')

        # 법정동 코드 앞 5자리 (구 단위)
        downlist = code_df['구코드'].unique()
        # 기존에 저장해둔 매매 list
        trade_data = pd.read_csv(trade_directory, encoding='utf-8-sig')
        loan_data = pd.read_csv(loan_directory, encoding='utf-8-sig')

        for d in downlist:
            for m in [lastmonth, month]:
                # 최신 data를 기존 csv에 추가
                pd.concat([trade_data,self.trans.get_data(property_type="아파트", trade_type="매매", sigungu_code=d, year_month=m)], ignore_index= True)
                pd.concat([loan_data,self.trans.get_data(property_type="아파트", trade_type="전월세", sigungu_code=d, year_month=m)], ignore_index= True)

        # 원본 저장
        trade_data.drop_duplicates().to_csv(trade_directory, index=False, encoding='utf-8-sig')
        loan_data.drop_duplicates().to_csv(loan_directory, index=False, encoding='utf-8-sig')

        trade_data = trade_data[['지역코드','도로명','법정동','지번','아파트','층','전용면적','년','월','일','거래금액','도로명건물본번호코드','도로명건물부번호코드','법정동읍면동코드','해제여부']]
        trade_data = trade_data[trade_data['해제여부'].isnull()].drop(columns=['해제여부']) # 취소 제외
        trade_data[['전용면적','거래금액']] = trade_data[['전용면적','거래금액']].astype(float)
        trade_data['거래금액'] = np.round(trade_data['거래금액'] / 10000,3) # 억 단위로 변경
        # datetime type의 거래일자 column 생성
        trade_data[['년','월','일']] = trade_data[['년','월','일']].astype(str)
        trade_data['거래일자'] = trade_data['년'] + "-" + trade_data['월'] + "-" + trade_data['일']
        trade_data['거래일자'] = trade_data['거래일자'].astype('datetime64[ns]')
        # 도로명 없는 경우는 등기 전으로 제외 처리
        trade_data.dropna(subset=['도로명건물본번호코드'], axis=0, inplace=True)
        trade_data[['도로명건물본번호코드','도로명건물부번호코드']] = trade_data[['도로명건물본번호코드','도로명건물부번호코드']].astype(int).astype(str)
        trade_data['도로명코드'] = trade_data['도로명'] + " " + trade_data['도로명건물본번호코드']
        trade_data['도로명코드'] = np.where(trade_data['도로명건물부번호코드'] == '0', trade_data['도로명코드'], trade_data['도로명코드'] + '-' + trade_data['도로명건물부번호코드'])
        trade_data['도로명코드'] = np.where(trade_data['도로명코드'].isna(), trade_data['도로명건물본번호코드'] + '-' + trade_data['도로명건물부번호코드'], trade_data['도로명코드'])

        loan_data = loan_data[['지역코드','법정동','지번','아파트','층','전용면적','년','월','일','보증금액','월세금액']]
        loan_data[['전용면적','보증금액','월세금액']] = loan_data[['전용면적','보증금액','월세금액']].astype(float)
        # datetime type의 거래일자 column 생성
        loan_data[['년','월','일']] = loan_data[['년','월','일']].astype(str)
        loan_data['거래일자'] = loan_data['년'] + "-" + loan_data['월'] + "-" + loan_data['일']
        loan_data['거래일자'] = loan_data['거래일자'].astype('datetime64[ns]')
        # 월세 -> 전세 전환율 5.5% 일괄적용
        loan_data['전월세'] = np.where(loan_data['월세금액'] == 0, '전세', '월세')
        loan_data['보증금액'] = loan_data['보증금액'] + loan_data['월세금액'] * 100 / 5.5 * 12
        loan_data['보증금액'] = np.round(loan_data['보증금액'] / 10000,3) # 억 단위로 변경

        # 비교 column string 변경
        trade_data[['지역코드']] = trade_data[['지역코드']].astype(str)
        loan_data[['지역코드']] = loan_data[['지역코드']].astype(str)

        return trade_data, loan_data

  공공데이터포털에서 확인가능한 서비스키를 __init__에 입력하여 활용 가능하다. 매매, 전월세 거래의 경우 raw 데이터를 저장하고, 전처리된 data는 return으로 main에 넘겨준다.

Scrapping Module

import pandas as pd
import numpy as np

import re, time, random
import urllib.request
import pyautogui
import PIL

from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select

class customScraper:
    def startDriver(self, url):
        self.driver = webdriver.Chrome()
        self.driver.get(url)
        time.sleep(2 * (random.random()+1))
        # clear pop-up
        main = self.driver.window_handles
        for i, x in enumerate(main):
            if i != 0:
                self.driver.switch_to.window(x)
                self.driver.close() 
                time.sleep(1 * (random.random()+1))
        self.driver.switch_to.window(main[0])
        time.sleep(1 * (random.random()+1))

    def endDriver(self):
        self.driver.quit()

    def getchangeAddress(self, addr) -> str:
        # 클래스 밖에서 driver를 on/off
        try:
            inputAddr = self.driver.find_element(By.ID, 'inputSearchAddr')
            inputAddr.clear()
            inputAddr.send_keys(addr)
            time.sleep(0.5 * (random.random()+1))
            # 검색 후 페이지 이동 대기
            self.driver.find_element(By.CLASS_NAME, 'btn_search.searchBtn').click()
            time.sleep(2 * (random.random()+1))
            text = self.driver.find_element(By.CLASS_NAME, 'subejct_2').find_element(By.CLASS_NAME, 'roadNameText').text
            time.sleep(0.5 * (random.random()+1))
        except:
            self.driver.get('https://www.juso.go.kr/openIndexPage.do')
            time.sleep(1 * (random.random()+1))
            inputAddr = self.driver.find_element(By.ID, 'inputSearchAddr')
            inputAddr.clear()
            inputAddr.send_keys(addr.split(",")[0])
            time.sleep(0.5 * (random.random()+1))
            # 검색 후 페이지 이동 대기
            self.driver.find_element(By.CLASS_NAME, 'btn_search.searchBtn').click()
            time.sleep(2 * (random.random()+1))
            text = self.driver.find_element(By.CLASS_NAME, 'subejct_2').find_element(By.CLASS_NAME, 'roadNameText').text
            time.sleep(0.5 * (random.random()+1))
        finally:
            self.driver.get('https://www.juso.go.kr/openIndexPage.do')
            time.sleep(1 * (random.random()+1))

        return text

    def getAutionitemList(self, fromdate, enddate) -> pd.DataFrame:
        # 물건상세검색 버튼 클릭
        self.driver.find_element(By.XPATH, '/html/body/div/div[1]/div[4]/div[3]/div[2]/div[1]/div[2]/ul/li[1]/div/a').click()
        time.sleep(2 * (random.random()+1))

        # 똑같은 항목들은 한 번만 지정
        # 아파트 지정
        setAPT = Select(self.driver.find_element(By.NAME, 'lclsUtilCd'))
        setAPT.select_by_value("0000802")
        time.sleep(1 * (random.random()+1))
        setAPT = Select(self.driver.find_element(By.NAME, 'mclsUtilCd'))
        setAPT.select_by_value("000080201")
        time.sleep(1 * (random.random()+1))
        setAPT = Select(self.driver.find_element(By.NAME, 'sclsUtilCd'))
        setAPT.select_by_value("00008020104")
        time.sleep(1 * (random.random()+1))
        # 날짜 지정
        time_textbox = self.driver.find_element(By.NAME, 'termStartDt')
        time_textbox.clear()
        time_textbox.send_keys("value", fromdate)
        time.sleep(1 * (random.random()+1))
        time_textbox = self.driver.find_element(By.NAME, 'termEndDt')
        time_textbox.clear()
        time_textbox.send_keys("value", enddate)
        time.sleep(1 * (random.random()+1))

        # 경매법원 별 따로 검색 필요
        courts = ['서울중앙지방법원','서울동부지방법원','서울서부지방법원','서울남부지방법원','서울북부지방법원']
        aution_item = pd.DataFrame()
        for court in courts:
            # 법원 지정
            setCourt = Select(self.driver.find_element(By.ID, 'idJiwonNm'))
            setCourt.select_by_value(court)
            time.sleep(1 * (random.random()+1))
            # 검색 후 페이지 이동 대기
            self.driver.find_element(By.XPATH, '/html/body/div[1]/div[4]/div[2]/form/div[2]/a[1]/img').click()
            time.sleep(2 * (random.random()+1))

            # 페이지 당 10개씩 변경
            if self.driver.find_elements(By.ID, 'ipage'):
                setPage = Select(self.driver.find_element(By.ID, 'ipage'))
                setPage.select_by_value("default10")
                time.sleep(2 * (random.random()+1))
            else:
                # 이전 page 돌아가기 클릭
                self.driver.find_element(By.XPATH, '/html/body/div[1]/div[4]/div[3]/div[4]/form/div/div/a/img').click()
                time.sleep(2 * (random.random()+1))
                continue

            page = 1
            while True:
                # 해당 페이지 물건 저장
                soup = BeautifulSoup(self.driver.page_source,'html.parser')
                table = soup.find('table', attrs={'class':'Ltbl_list'})
                table_rows = table.find_all('tr')
                row_list = []
                for tr in table_rows:
                    td = tr.find_all('td')
                    row = [tr.text for tr in td]
                    row_list.append(row)
                if len(aution_item) == 0: aution_item = pd.DataFrame(row_list).iloc[1:]
                else: aution_item = pd.concat([aution_item, pd.DataFrame(row_list).iloc[1:]], ignore_index=True)
                
                # 다음 페이지로 이동
                page2parent = self.driver.find_element(By.CLASS_NAME, 'page2')
                children = page2parent.find_elements(By.XPATH, '*')
                if page == 1:# page가 1개 밖에 없을때
                    if len(children) == page: break # 종료
                    else: children[page].click() # 다음 page open
                elif page <= 10: # 처음 100개
                    if len(children) - 1 == page: break # 이동 키 포함 마지막 page일 경우 종료
                    else: children[page+1].click()
                else: # 나머지
                    if len(children) - 2 == (page % 10): break # 이동 키 포함 마지막 page일 경우 종료
                    else: children[(page % 10) + 2].click()
                page += 1
                time.sleep(1 * (random.random()+1))
            # 이전 page 돌아가기 클릭
            self.driver.find_element(By.XPATH, '/html/body/div[1]/div[4]/div[3]/div[4]/form[1]/div/div/a[4]/img').click()
            time.sleep(2 * (random.random()+1))

        aution_item = aution_item.iloc[:,1:] # 첫 번째 선택 column은 불필요
        col_list = ['사건번호','물건번호','소재지','비고','감정평가액','날짜']
        aution_item.columns = col_list # col 지정
        for col in col_list: # \t는 전부 제거
            aution_item[col] = aution_item[col].str.replace('\t','')    
        for col in col_list: # 겹치는 \n는 하나로
            aution_item[col] = aution_item[col].apply(lambda x : re.sub(r"\n+", "\n", x))

        aution_item['법원'] = aution_item['사건번호'].str.split('\n').str[1]
        aution_item['사건번호'] = aution_item['사건번호'].str.split('\n').str[2]
        aution_item['용도'] = aution_item['물건번호'].str.split('\n').str[2]
        aution_item['물건번호'] = aution_item['물건번호'].str.split('\n').str[1]
        aution_item['내역'] = aution_item['소재지'].str.split('\n').str[2:].str.join(' ')
        aution_item['소재지'] = aution_item['소재지'].str.split('\n').str[1]
        aution_item['비고'] = aution_item['비고'].str.split('\n').str[1:].str.join('\n')
        aution_item['최저가격'] = aution_item['감정평가액'].str.split('\n').str[2]
        aution_item['최저비율'] = aution_item['감정평가액'].str.split('\n').str[3].str[1:-1] # 괄호 삭제
        aution_item['감정평가액'] = aution_item['감정평가액'].str.split('\n').str[1]
        aution_item['유찰횟수'] = aution_item['날짜'].str.split('\n').str[3]
        aution_item['유찰횟수'] = aution_item['유찰횟수'].str.strip()
        aution_item['유찰횟수'] = np.where(aution_item['유찰횟수'].str.len() == 0, '0회', aution_item['유찰횟수'].str.slice(start=2))
        aution_item['날짜'] = aution_item['날짜'].str.split('\n').str[2]
        aution_item = aution_item[['날짜','법원','사건번호','물건번호','용도','감정평가액','최저가격','최저비율','유찰횟수','소재지','내역','비고']]
        aution_item = aution_item[~aution_item['비고'].str.contains('지분매각')].reset_index(drop=True)

        return aution_item
    
    def getAutionDetail(self, row):
        isImage = dict()
        # 법원 지정
        court = Select(self.driver.find_element(By.ID, 'idJiwonNm'))
        court.select_by_value(row['법원'])
        time.sleep(0.5 * (random.random()+1))
        # 사건 번호
        year = Select(self.driver.find_element(By.ID, 'idSaYear'))
        year.select_by_value(row['사건번호'][:4])
        time.sleep(0.5 * (random.random()+1))
        number = self.driver.find_element(By.ID, 'idSaSer')
        number.clear()
        number.send_keys("value", row['사건번호'][6:])
        time.sleep(0.5 * (random.random()+1))
        # 검색 클릭
        self.driver.find_element(By.XPATH, '/html/body/div[1]/div[4]/div[2]/form/div[2]/div/div[5]/div/a/img').click()
        time.sleep(1 * (random.random()+1))

        # 물건내역 list 불러오기. 다른 표에서도 동일 class_name 활용
        item_list = self.driver.find_elements(By.CLASS_NAME, 'Ltbl_dt')
        for i in item_list:
            if i.get_property('summary') == '물건내역 표': # 다른 표인경우 pass
                if i.find_element(By.XPATH,'./tbody/tr[1]/td[1]').text == row['물건번호']: #동일 물건번호인 경우 물건 상세조회 클릭
                    i.find_element(By.CSS_SELECTOR, '[alt="물건상세조회"]').click()                
                    time.sleep(2 * (random.random()+1))
                    break        
        try: 
            # 사진 저장 위해 임의 사진 클릭
            self.driver.find_element(By.CLASS_NAME, 'photo_td').find_element(By.XPATH, './a').click()
            time.sleep(2 * (random.random()+1))
            # 팝업 창으로 연결 변경
            main = self.driver.window_handles
            self.driver.switch_to.window(main[1])
            time.sleep(1 * (random.random()+1))

            # 사진 종류 중 1순위 전경도, 2순위 개황도, 3순위 내부구조도, 4순위 관련사진, 5순위 현재사진 순으로 확인 후 2장 저장
            filterdd = Select(self.driver.find_element(By.ID, 'idPhotoGbncd'))
            filterList = [x.text for x in filterdd.options]

            for c in range(2):
                if not filterList: pass # 한개도 없으면 현재사진을 저장
                else: # 한개 이상 있으면 새로운 사진을 저장.
                    if '전경도' in filterList: selectimage = '전경도'
                    elif '개황도' in filterList: selectimage = '개황도'
                    elif '내부구조도' in filterList: selectimage = '내부구조도'
                    elif '관련사진' in filterList: selectimage = '관련사진'
                    else: selectimage = filterList[-1]
                    filterdd.select_by_visible_text(selectimage)
                    filterList.remove(selectimage)
                    time.sleep(1 * (random.random()+1))

                imgUrl = self.driver.find_element(By.XPATH, '/html/body/div[1]/div/form/div[2]/table/tbody/tr[1]/td/img').get_attribute("src")
                imgUrl = imgUrl.replace('https', 'http')
                urllib.request.urlretrieve(imgUrl, f'./temp/building_image_{c}.jpg')
                time.sleep(2 * (random.random()+1))
                filterdd = Select(self.driver.find_element(By.ID, 'idPhotoGbncd'))

            # 팝업 종료 후 기존 창으로 복귀
            self.driver.close()
            time.sleep(0.5 * (random.random()+1))
            self.driver.switch_to.window(main[0])
            time.sleep(1 * (random.random()+1))
            isImage['isPic'] = True
        except: # 가끔 사진이 없는 경우가 있으며 해당 경우는 예외 처리 함.
            isImage['isPic'] = False
            print(row['사건번호'], '사진 없음')
            
        # 물건매각명세서가 있으면 클릭. 없으면 예외 처리를 위한 변수 저장 후 pass
        isPapyrus = self.driver.find_elements(By.CSS_SELECTOR, '[alt="매각물건명세서 팝업"]')
        if isPapyrus:
            # 실행 후 대기
            isPapyrus[0].click()
            time.sleep(8 + random.random())
            # 파피루스 프로그램 선택
            for win in pyautogui.getAllWindows():
                if win.title[:7] == 'Papyrus':
                    Papyrus = pyautogui.getWindowsWithTitle(win.title)[0]
                    break
            if not Papyrus.isActive: Papyrus.activate() # 프로그램 활성화
            if not Papyrus.isMaximized: Papyrus.maximize() # 프로그램 최대화
            time.sleep(1 * (random.random()+1))
            pyautogui.click(749,101) # 전체 맞춤
            time.sleep(1 * (random.random()+1))
            pyautogui.click(626,101) # 배율 선택
            time.sleep(0.5 * (random.random()+1))
            pyautogui.hotkey('ctrl', 'a') # 전체 선택
            time.sleep(0.1 * (random.random()+1))
            pyautogui.typewrite('140', interval=0.1 * (random.random()+1)) # 140 입력
            time.sleep(0.1 * (random.random()+1))
            pyautogui.hotkey('enter') # 엔터
            time.sleep(0.5 * (random.random()+1))
            # save screenshot
            p = pyautogui.screenshot()
            p.save(f'./temp/building_info.png')
            # edit screenshot
            im = PIL.Image.open(f'./temp/building_info.png')
            im_crop = im.crop((720,185,1482,967))
            im_crop.save(f'./temp/building_info.png', quality=100)
            Papyrus.close()
            isImage['isinfo'] = True
        else:
            isImage['isinfo'] = False
            print(row['사건번호'], '물건매각명세서 없음')

        # 이전으로 복귀
        self.driver.find_element(By.XPATH, '/html/body/div[1]/div[4]/div[2]/div[4]/div[1]/div/a[2]/img').click()
        time.sleep(2 * (random.random()+1))

        return isImage

  각종 목표에 따라 여러 사이트를 스크래핑하는 코드 클래스다. 앞선 게시글의 기능과 동일하며 새로운 부분은 추가로 언급하고 넘어가고자 한다.

  • startDriver(), endDriver() 함수로 main에서 원하는 url로 chrome을 실행시킬 수 있다. for 문 + df.iterrows()를 자주 사용하는데, df 전체를 argu로 전달하고 싶지 않아서 구현한 기능이다.
  • getAutionitemList() 는 kwg를 받아 argument로 법원 물건 검색 조건을 설정할 수 있다. (구현 X)
  • getAutionDetail()에서 return하는 isImage는 이후 ppt에서 이미지 존재 여부를 전달하기 위한 dictonary.

Visualization Module

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.family'] ='Malgun Gothic'
plt.rcParams['axes.unicode_minus'] =False

import re, datetime
import PIL

from pptx.util import Cm, Pt
from pptx.enum.text import PP_ALIGN
from pptx import Presentation

def saveTransactionChart(row, trade_data, loan_data):
    trade_data_item = trade_data[trade_data['지역코드'] == str(row['구코드'])]
    trade_data_item = trade_data_item[[all([s in row['소재지'].split(",")[0].split(" ") for s in rc.split(" ")]) for rc in list(trade_data_item['도로명코드'])]]
    
    # 매매나 전월세가 한건도 없으면 패스. 실제로 없는지, 코드오류인지 확인 필요
    if not len(trade_data_item):
        isChart = False
        print(row['사건번호'], '거래 기록 없음')
    else:    
        isChart = True
        loan_data_item = loan_data[(loan_data['지역코드'] == str(row['구코드']))]
        loan_data_item = loan_data_item[(loan_data_item['법정동'] == trade_data_item['법정동'].unique()[0])]
        loan_data_item = loan_data_item[(loan_data_item['지번'] == trade_data_item['지번'].unique()[0])]
        loan_data_item = loan_data_item[(loan_data_item['아파트'] == trade_data_item['아파트'].unique()[0])]
        # 면적은 ± 1m2 의 범위를 설정. 서로 다른 type 도 한 chart에 그릴 수 있게
        trade_data_item = trade_data_item[(float(row['면적']) - 1 <= trade_data_item['전용면적']) & (trade_data_item['전용면적'] <= float(row['면적']) + 1)]
        loan_data_item = loan_data_item[(float(row['면적']) - 1 <= loan_data_item['전용면적']) & (loan_data_item['전용면적'] <= float(row['면적']) + 1)]

        with plt.rc_context({'xtick.color':'dimgrey', 'ytick.color':'dimgrey'}):
            plt.figure(figsize=(9.22*(1/2.54), 6.77*(1/2.54))) # ppt에 들어갈 크기로 고정
            plt.plot(trade_data_item['거래일자'], trade_data_item['거래금액'], 'g.', label = '매매')
            plt.plot(loan_data_item[loan_data_item['전월세'] == '전세']['거래일자'], loan_data_item[loan_data_item['전월세'] == '전세']['보증금액'], 'b.', label = '전세')
            plt.plot(loan_data_item[loan_data_item['전월세'] == '월세']['거래일자'], loan_data_item[loan_data_item['전월세'] == '월세']['보증금액'], 'c.', label = '월세')
            plt.axhline(round(float(re.sub('[^0-9]', "",row['감정평가액'])) / 100000000 , 2),0,1, color = 'k', linestyle = '--', label = '감정가')
            plt.axhline(round(float(re.sub('[^0-9]', "",row['최저가격'])) / 100000000 , 2),0,1, color = 'r', linestyle = '--', label = '최소가')
            plt.xticks(fontsize=7,rotation = 90)
            plt.yticks(fontsize=7)
            plt.xlim([datetime.datetime.strptime("2014-01-01", '%Y-%m-%d'),datetime.datetime.now()])
            plt.gca().set_yticklabels([f'{x}억'for x in plt.gca().get_yticks()])
            plt.grid(linestyle = '--', color = 'darkgrey')
            plt.legend(fontsize=6, loc = 2)
            plt.tight_layout()
            plt.savefig(f'./temp/chart.png')

    return isChart


class customPPTX():
    def __init__(self, template_directory):
        self.prs = Presentation(pptx=template_directory)
        self.layout = self.prs.slide_layouts[0]

    def save(self, directory):
        self.prs.save(directory)

    def setAutionSlide(self, row: pd.Series, isImage:dict):
        # 물건별 슬라이드
        slide = self.prs.slides.add_slide(self.layout)
        # 텍스트 박스 생성
        tfs = []
        tf = slide.shapes.add_textbox(Cm(5.54), Cm(0.56), Cm(4.6), Cm(0.6)).text_frame
        tf.text = row['법원'] # 법원
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(5.54), Cm(1.24), Cm(4.6), Cm(0.6)).text_frame
        tf.text = row['사건번호'] # 사건번호
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(5.54), Cm(1.93), Cm(4.6), Cm(0.6)).text_frame
        tf.text = row['물건번호'] # 물건번호
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(5.54), Cm(2.58), Cm(4.6), Cm(0.6)).text_frame
        tf.text = row['용도'] # 용도
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(5.54), Cm(3.92), Cm(4.6), Cm(0.6)).text_frame
        tf.text = row['최저가격'] # 최저가격
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(0.79), Cm(3.92), Cm(4.6), Cm(0.6)).text_frame
        tf.text = row['감정평가액'] # 감정평가액
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(10.29), Cm(3.92), Cm(3.87), Cm(0.6)).text_frame
        tf.text = row['최저비율'] # 최저비율
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(14.24), Cm(3.91), Cm(3.87), Cm(0.6)).text_frame
        tf.text = row['유찰횟수'] # 유찰횟수
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(18.23), Cm(3.91), Cm(5.69), Cm(0.6)).text_frame
        tf.text = str(row['건폐율']) + "/" + str(row['용적률']) # 건폐율/용적률
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(24.05), Cm(3.91), Cm(3.87), Cm(0.6)).text_frame
        tf.text = str(row['year']) # 사용승인일
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(27.92), Cm(3.91), Cm(5.14), Cm(0.6)).text_frame
        tf.text = str(row['세대수']) # 세대수
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(14.24), Cm(0.57), Cm(18.83), Cm(0.6)).text_frame
        tf.text = row['소재지'] # 소재지
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(14.24), Cm(1.28), Cm(18.83), Cm(0.6)).text_frame
        tf.text = row['내역'] # 내역
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(14.24), Cm(1.9), Cm(18.83), Cm(1.2)).text_frame
        tf.text = row['비고'] # 비고
        tfs.append(tf)
        tf = slide.shapes.add_textbox(Cm(21.75), Cm(5.25), Cm(11.31), Cm(0.6)).text_frame
        tf.text = '' # 임차인
        tfs.append(tf)
        for t in tfs:
            t.paragraphs[0].font.size = Pt(10)
            t.paragraphs[0].alignment = PP_ALIGN.CENTER
        # 지도 추가
        slide.shapes.add_picture("./temp/building_map.png", Cm(1.43), Cm(11.48), height = Cm(6.77))
        # 매각 물건 명세서 추가
        if isImage['isinfo']: slide.shapes.add_picture("./temp/building_info.png", Cm(21.75), Cm(6), height = Cm(11.35))
        if isImage['isPic']: # 사진 2장 추가
            im = PIL.Image.open(f"./temp/building_image_{0}.jpg")
            if im.size[0] > im.size[1] * (8.7/6.77) :slide.shapes.add_picture(f"./temp/building_image_{0}.jpg", Cm(1.43), Cm(4.6), width = Cm(8.7)) # 가로가 더 큰 경우
            else: slide.shapes.add_picture(f"./temp/building_image_{0}.jpg", Cm(1.43), Cm(4.6), height = Cm(6.77)) # 세로가 더 큰 경우

            im = PIL.Image.open(f"./temp/building_image_{1}.jpg")
            if im.size[0] > im.size[1] * (9.22/6.77) : slide.shapes.add_picture(f"./temp/building_image_{1}.jpg", Cm(10.82), Cm(4.6), width = Cm(9.22)) # 가로가 더 큰 경우
            else: slide.shapes.add_picture(f"./temp/building_image_{1}.jpg", Cm(10.82), Cm(4.6), height = Cm(6.77)) # 세로가 더 큰 경우
        # 거래 차트 추가
        if isImage['isChart']: slide.shapes.add_picture(f'./temp/chart.png', Cm(10.89), Cm(11.48))

  거래 이력을 이미지로 저장하는 기능은 클래스가 아닌 함수로 구현했다. ppt 생성은 클래스로 구현하여 main에서 메서드를 통해 신규 슬라이드 생성 및 저장이 가능하게끔 구현했다.

이후 목표

  • 경매 결과 스크래핑 구현 예정이다.

 

728x90
반응형

+ Recent posts